[Тестирование IT-систем, Java, Apache, Микросервисы] Сервисы с Apache Kafka и тестирование

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
09-Янв-2021 20:30

Когда сервисы интегрируются при помощи Kafka очень удобно использовать REST API, как универсальный и стандартный способ обмена сообщениями. При увеличении количества сервисов сложность коммуникаций увеличивается. Для контроля можно и нужно использовать интеграционное тестирование. Такие библиотеки как testcontainers или EmbeddedServer прекрасно помогают организовать такое тестирование. Существуют много примеров для micronaut, Spring Boot и т.д. Но в этих примерах опущены некоторые детали, которые не позволяют с первого раза запустить код. В статье приводятся примеры с подробным описанием и ссылками на код.
Пример
Для простоты можно принять такой REST API.
/runs — POST-метод. Инициализирует запрос в канал связи. Принимает данные и возвращает ключ запроса.
/runs/{key}/status – GET-метод. По ключу возвращает статус запроса. Может принимать следующие значения: UNKNOWN, RUNNING, DONE.
/runs /{key} – GET-метод. По ключу возвращает результат запроса.
Подобный API реализован у livy, хотя и для других задач.
Реализация
Будут использоваться: micronaut, Spring Boot.
micronaut
Контроллер для API.
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.reactivex.Maybe;
import io.reactivex.schedulers.Schedulers;
import javax.inject.Inject;
import java.util.UUID;
@Controller("/runs")
public class RunController {
    @Inject
    RunClient runClient;
    @Inject
    RunCache runCache;
    @Post
    public String runs(@Body String body) {
        String key = UUID.randomUUID().toString();
        runCache.statuses.put(key, RunStatus.RUNNING);
        runCache.responses.put(key, "");
        runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));
        return key;
    }
    @Get("/{key}/status")
    public Maybe<RunStatus> getRunStatus(String key) {
        return Maybe.just(key)
                .subscribeOn(Schedulers.io())
                .map(it -> runCache.statuses.getOrDefault(it, RunStatus.UNKNOWN));
    }
    @Get("/{key}")
    public Maybe<String> getRunResponse(String key) {
        return Maybe.just(key)
                .subscribeOn(Schedulers.io())
                .map(it -> runCache.responses.getOrDefault(it, ""));
    }
}

Отправка сообщений в kafka.
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.messaging.annotation.Body;
@KafkaClient
public interface RunClient {
    @Topic("runs")
    void sendRun(@KafkaKey String key, @Body Run run);
}

Получение сообщений из kafka.
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.messaging.annotation.Body;
import javax.inject.Inject;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class RunListener {
    @Inject
    RunCalculator runCalculator;
    @Topic("runs")
    public void receive(@KafkaKey String key, @Body Run run) {
        runCalculator.run(key, run);
    }
}

Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import io.micronaut.context.annotation.Replaces;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.UUID;
@Replaces(RunCalculatorImpl.class)
@Singleton
public class RunCalculatorWithWork implements RunCalculator {
    @Inject
    RunClient runClient;
    @Inject
    RunCache runCache;
    @Override
    public void run(String key, Run run) {
        if (RunType.REQUEST.equals(run.getType())) {
            String runKey = run.getKey();
            String newKey = UUID.randomUUID().toString();
            String runBody = run.getBody();
            runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));
        } else if (RunType.RESPONSE.equals(run.getType())) {
            runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);
            runCache.responses.replace(run.getResponseKey(), run.getBody());
        }
    }
}

Тест.
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.HttpClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
public abstract class RunBase {
    void run(HttpClient client) {
        String key = client.toBlocking().retrieve(HttpRequest.POST("/runs", "body"));
        RunStatus runStatus = RunStatus.UNKNOWN;
        while (runStatus != RunStatus.DONE) {
            runStatus = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key + "/status"), RunStatus.class);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String response = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key), String.class);
        assertEquals("body_calculated", response);
    }
}

Для использования EmbeddedServer необходимо.
Подключить библиотеки:
testImplementation("org.apache.kafka:kafka-clients:2.6.0:test")
testImplementation("org.apache.kafka:kafka_2.12:2.6.0")
testImplementation("org.apache.kafka:kafka_2.12:2.6.0:test")

Тест может выглядеть так.
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.client.HttpClient;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class RunKeTest extends RunBase {
    @Test
    void test() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("kafka.bootstrap.servers", "localhost:9092");
        properties.put("kafka.embedded.enabled", "true");
        try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {
            ApplicationContext applicationContext = embeddedServer.getApplicationContext();
            HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());
            run(client);
        }
    }
}

Для использования testcontainers необходимо.
Подключить библиотеки:
implementation("org.testcontainers:kafka:1.14.3")

Тест может выглядеть так.
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.client.HttpClient;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
public class RunTcTest extends RunBase {
    @Test
    public void test() {
        try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))) {
            kafka.start();
            Map<String, Object> properties = new HashMap<>();
            properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
            try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {
                ApplicationContext applicationContext = embeddedServer.getApplicationContext();
                HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());
                run(client);
            }
        }
    }
}

Spring Boot
Контроллер для API.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/runs")
public class RunController {
    @Autowired
    private RunClient runClient;
    @Autowired
    private RunCache runCache;
    @PostMapping()
    public String runs(@RequestBody String body) {
        String key = UUID.randomUUID().toString();
        runCache.statuses.put(key, RunStatus.RUNNING);
        runCache.responses.put(key, "");
        runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));
        return key;
    }
    @GetMapping("/{key}/status")
    public RunStatus getRunStatus(@PathVariable String key) {
        return runCache.statuses.getOrDefault(key, RunStatus.UNKNOWN);
    }
    @GetMapping("/{key}")
    public String getRunResponse(@PathVariable String key) {
        return runCache.responses.getOrDefault(key, "");
    }
}

Отправка сообщений в kafka.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class RunClient {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private ObjectMapper objectMapper;
    public void sendRun(String key, Run run) {
        String data = "";
        try {
            data = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(run);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send("runs", key, data);
    }
}

Получение сообщений из kafka.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class RunListener {
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private RunCalculator runCalculator;
    @KafkaListener(topics = "runs", groupId = "m-group")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        String key = consumerRecord.key().toString();
        Run run = null;
        try {
            run = objectMapper.readValue(consumerRecord.value().toString(), Run.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        runCalculator.run(key, run);
    }
}

Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class RunCalculatorWithWork implements RunCalculator {
    @Autowired
    RunClient runClient;
    @Autowired
    RunCache runCache;
    @Override
    public void run(String key, Run run) {
        if (RunType.REQUEST.equals(run.getType())) {
            String runKey = run.getKey();
            String newKey = UUID.randomUUID().toString();
            String runBody = run.getBody();
            runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));
        } else if (RunType.RESPONSE.equals(run.getType())) {
            runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);
            runCache.responses.replace(run.getResponseKey(), run.getBody());
        }
    }
}

Тест.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class RunBase {
    void run(MockMvc mockMvc, ObjectMapper objectMapper) throws Exception {
        MvcResult keyResult = mockMvc.perform(MockMvcRequestBuilders.post("/runs")
                .content("body")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON))
                .andExpect(status().isOk())
                .andReturn();
        String key = keyResult.getResponse().getContentAsString();
        RunStatus runStatus = RunStatus.UNKNOWN;
        while (runStatus != RunStatus.DONE) {
            MvcResult statusResult = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key + "/status")
                    .contentType(MediaType.APPLICATION_JSON)
                    .accept(MediaType.APPLICATION_JSON))
                    .andExpect(status().isOk())
                    .andReturn();
            runStatus = objectMapper.readValue(statusResult.getResponse().getContentAsString(), RunStatus.class);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String response = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key)
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON))
                .andExpect(status().isOk())
                .andReturn().getResponse().getContentAsString();
        assertEquals("body_calculated", response);
    }
}

Для использования EmbeddedServer необходимо.
Подключить библиотеки:
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.10.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.5.10.RELEASE</version>
    <scope>test</scope>
</dependency>

Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
@AutoConfigureMockMvc
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@Import(RunKeTest.RunKeTestConfiguration.class)
public class RunKeTest extends RunBase {
    @Autowired
    private MockMvc mockMvc;
    @Autowired
    private ObjectMapper objectMapper;
    @Test
    void test() throws Exception {
        run(mockMvc, objectMapper);
    }
    @TestConfiguration
    static class RunKeTestConfiguration {
        @Autowired
        private RunCache runCache;
        @Autowired
        private RunClient runClient;
        @Bean
        public RunCalculator runCalculator() {
            RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();
            runCalculatorWithWork.runCache = runCache;
            runCalculatorWithWork.runClient = runClient;
            return runCalculatorWithWork;
        }
    }
}

Для использования testcontainers необходимо.
Подключить библиотеки:
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.14.3</version>
    <scope>test</scope>
</dependency>

Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.test.web.servlet.MockMvc;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
@AutoConfigureMockMvc
@SpringBootTest
@Import(RunTcTest.RunTcTestConfiguration.class)
public class RunTcTest extends RunBase {
    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"));
    static {
        kafka.start();
    }
    @Autowired
    private MockMvc mockMvc;
    @Autowired
    private ObjectMapper objectMapper;
    @Test
    void test() throws Exception {
        run(mockMvc, objectMapper);
    }
    @TestConfiguration
    static class RunTcTestConfiguration {
        @Autowired
        private RunCache runCache;
        @Autowired
        private RunClient runClient;
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "m-group");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
        @Bean
        public RunCalculator runCalculator() {
            RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();
            runCalculatorWithWork.runCache = runCache;
            runCalculatorWithWork.runClient = runClient;
            return runCalculatorWithWork;
        }
    }
}

Перед всеми тестами необходимо стартовать kafka. Это делается вот таким вот образом:
kafka.start();

Дополнительные свойства для kafka в тестах можно задать в ресурсном файле.
application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

Ресурсы и ссылки
Код для micronaut
Код для Spring Boot
PART 1: TESTING KAFKA MICROSERVICES WITH MICRONAUT
Testing Kafka and Spring Boot
Micronaut Kafka
Spring for Apache Kafka
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_testirovanie_itsistem (Тестирование IT-систем), #_java, #_apache, #_mikroservisy (Микросервисы), #_java, #_micronaut, #_spring_boot, #_apache_kafka, #_kafka, #_junit, #_microservices, #_testirovanie_itsistem (
Тестирование IT-систем
)
, #_java, #_apache, #_mikroservisy (
Микросервисы
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 12:38
Часовой пояс: UTC + 5