[Тестирование IT-систем, Java, Apache] Apache Kafka и тестирование с Kafka Server

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
12-Ноя-2020 16:32

Введение
Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer.
Что будет тестироваться?
Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п.
Пусть имя сервиса будет: SenderService.
Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений.
Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал.
Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно.
Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.
Сервис
Сервис будет иметь возможность начать работу и остановить свою работу.
void start()
void stop()

При старте необходимо задать, как минимум, следующие параметры:
String bootstrapServers
String senderTopic
EmailService emailService

bootstrapServers – адрес kafka.
senderTopic – топик, из которого будут считываться сообщения.
emailService – сервис для конечной отправки сообщений по почте.
В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений.
Теперь необходим «потребитель», который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких «потребителей» можно выбирать. Подход для написания «потребителя» описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.
Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
    SenderConsumerLoop senderConsumerLoop =
            new SenderConsumerLoop(
                    bootstrapServers,
                    senderTopic,
                    "sender",
                    "sender",
                    tasksExecutorService,
                    emailService
            );
    closeables.add(senderConsumerLoop);
    senderTasksExecutor.submit(senderConsumerLoop);
}

В цикле создается экземпляр «потребителя», запоминается в коллекции и запускается через сервис запуска задач.
При выполнении этого кода «потребители» начинают работать. Сервис ждет их завершения или сигнала для остановки.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    for (AutoCloseable autoCloseable : closeables) {
        try {
            autoCloseable.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    senderTasksExecutor.shutdown();
    tasksExecutorService.shutdown();
    stop();
    try {
        senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}));

При завершении необходимо освободить ресурсы.
«Потребитель»
«Потребитель» имеет следующие публичные методы:
void run()
void close()

Основной метод: run.
@Override
public void run() {
    kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
    kafkaConsumer.subscribe(Collections.singleton(topic));
    while (true) {
        calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
    }
}

По входным параметрам создается экземпляр «kafka-потребителя». «kafka-потребитель» подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку.
Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки.
Пример сообщения:
{
  "subject": {
    "subject_type": "send"
  },
  "body": {
    "method": "email",
    "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
    "title": "42",
    "message": "73"
  }
}

subject_type — тип сообщения. Для сервиса нужно значение «send».
method – тип конечного сервиса для отправки. «email» — отправка через почту.
recipients – список получателей.
title – заголовок для сообщения.
message – сообщение.
Обработка всех записей:
void calculate(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        calculate(record);
    }
}

Обработка одной записи:
void calculate(ConsumerRecord<String, String> record) {
            JSONParser jsonParser = new JSONParser();
            Object parsedObject = null;
            try {
                parsedObject = jsonParser.parse(record.value());
            } catch (ParseException e) {
                e.printStackTrace();
            }
            if (parsedObject instanceof JSONObject) {
                JSONObject jsonObject = (JSONObject) parsedObject;
                JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
                String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
                if (SEND.equals(subjectType)) {
                    JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
                    calculate(jsonBody);
                }
            }
        }

Распределение сообщений по типу:
void calculate(JSONObject jsonBody) {
    String method = jsonBody.get(METHOD).toString();
    if (EMAIL_METHOD.equals(method)) {
        String recipients = jsonBody.get(RECIPIENTS).toString();
        String title = jsonBody.get(TITLE).toString();
        String message = jsonBody.get(MESSAGE).toString();
        sendEmail(recipients, title, message);
    }
}

Отправка в конечную систему:
void sendEmail(String recipients, String title, String message) {
    tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}

Отправка сообщений происходит через сервис исполнения задач.
Ожидания завершения отправки не происходит.
Создание «kafka-потребителя»:
static KafkaConsumer<String, String> createKafkaConsumerStringString(
        String bootstrapServers,
        String clientId,
        String groupId
) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(properties);
}

Интерфейс для писем:
interface EmailService {
    void send(String recipients, String title, String message);
}

Тест
Для теста понадобиться следующее.
Адрес «kafka-сервера».
Порт для «kafka-сервера».
Имя топика.
Сервис для управления «kafka-сервером». Будет описан ниже.
public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);
        }
    }
}

Задаются параметры. Создается сервис для управления «kafka-сервером». «kafka-сервером» стартует. Создается необходимый топик.
Создается «mock» конечного сервиса для отправки сообщений:
SenderService.EmailService emailService = mock(SenderService.EmailService.class);

Создается сам сервис и стартует:
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();

Задаются параметры для сообщения:
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";

Отправляется сообщение в канал:
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));

Ожидание:
Thread.sleep(6000);

Проверка, что сообщение дошло до конечного сервиса:
verify(emailService).send(recipients, title, message);

Остановка:
senderService.stop();

Все вместе:
public class SenderServiceTest {
    @Test
    void consumeEmail() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String senderTopic = "sender_data";
        try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
            kafkaServerService.start();
            kafkaServerService.createTopic(senderTopic);
            SenderService.EmailService emailService = mock(SenderService.EmailService.class);
            SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
            senderService.start();
            String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
            String title = "42";
            String message = "73";
            kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
            Thread.sleep(6000);
            verify(emailService).send(recipients, title, message);
            senderService.stop();
        }
    }
}

Вспомогательный код:
public class SenderFactory {
    public static final String SUBJECT = "subject";
    public static final String SUBJECT_TYPE = "subject_type";
    public static final String BODY = "body";
    public static final String METHOD = "method";
    public static final String EMAIL_METHOD = "email";
    public static final String RECIPIENTS = "recipients";
    public static final String TITLE = "title";
    public static final String MESSAGE = "message";
    public static final String SEND = "send";
    public static String key() {
        return UUID.randomUUID().toString();
    }
    public static String createMessage(String method, String recipients, String title, String message) {
        Map<String, Object> map = new HashMap<>();
        Map<String, Object> subject = new HashMap<>();
        Map<String, Object> body = new HashMap<>();
        map.put(SUBJECT, subject);
        subject.put(SUBJECT_TYPE, SEND);
        map.put(BODY, body);
        body.put(METHOD, method);
        body.put(RECIPIENTS, recipients);
        body.put(TITLE, title);
        body.put(MESSAGE, message);
        return JSONObject.toJSONString(map);
    }
}

Сервис для управления «kafka-сервером»
Основные методы:
void start()
void close()
void createTopic(String topic)

В методе «start» происходит создание сервера и вспомогательных объектов.
Создание «zookeeper» и сохранение его адреса:
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();

Создание клиента «zookeeper»:
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);

Задание свойств для сервера:
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
    brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
    throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);

Создание сервера:
kafkaServer = TestUtils.createServer(config, new MockTime());

Все вместе:
public void start() {
    zkServer = new EmbeddedZookeeper();
    String zkConnect = zkHost + ":" + zkServer.port();
    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    zkUtils = ZkUtils.apply(zkClient, false);
    Properties brokerProps = new Properties();
    brokerProps.setProperty("zookeeper.connect", zkConnect);
    brokerProps.setProperty("broker.id", "0");
    try {
        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
    brokerProps.setProperty("offsets.topic.replication.factor", "1");
    KafkaConfig config = new KafkaConfig(brokerProps);
    kafkaServer = TestUtils.createServer(config, new MockTime());
}

Остановка сервиса:
@Override
public void close() {
    kafkaServer.shutdown();
    zkClient.close();
    zkServer.shutdown();
}

Создание топика:
public void createTopic(String topic) {
    AdminUtils.createTopic(
            zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}

Заключение
В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ.
Для создания и тестирования сервисов с использованием «kafka» можно обратиться к следующему ресурсу:
kafka-streams-examples
Ссылки и ресурсы
Исходный код
Код для тестирования с «kafka-сервером»
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_testirovanie_itsistem (Тестирование IT-систем), #_java, #_apache, #_java, #_kafka, #_apache_kafka, #_junit, #_services, #_testirovanie_itsistem (
Тестирование IT-систем
)
, #_java, #_apache
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 23-Ноя 01:37
Часовой пояс: UTC + 5