[Тестирование IT-систем, Java, Apache] Apache Kafka и тестирование с Kafka Server
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Введение
Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer.
Что будет тестироваться?
Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п.
Пусть имя сервиса будет: SenderService.
Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений.
Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал.
Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно.
Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.
Сервис
Сервис будет иметь возможность начать работу и остановить свою работу.
void start()
void stop()
При старте необходимо задать, как минимум, следующие параметры:
String bootstrapServers
String senderTopic
EmailService emailService
bootstrapServers – адрес kafka.
senderTopic – топик, из которого будут считываться сообщения.
emailService – сервис для конечной отправки сообщений по почте.
В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений.
Теперь необходим «потребитель», который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких «потребителей» можно выбирать. Подход для написания «потребителя» описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.
Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
SenderConsumerLoop senderConsumerLoop =
new SenderConsumerLoop(
bootstrapServers,
senderTopic,
"sender",
"sender",
tasksExecutorService,
emailService
);
closeables.add(senderConsumerLoop);
senderTasksExecutor.submit(senderConsumerLoop);
}
В цикле создается экземпляр «потребителя», запоминается в коллекции и запускается через сервис запуска задач.
При выполнении этого кода «потребители» начинают работать. Сервис ждет их завершения или сигнала для остановки.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (AutoCloseable autoCloseable : closeables) {
try {
autoCloseable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
senderTasksExecutor.shutdown();
tasksExecutorService.shutdown();
stop();
try {
senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
При завершении необходимо освободить ресурсы.
«Потребитель»
«Потребитель» имеет следующие публичные методы:
void run()
void close()
Основной метод: run.
@Override
public void run() {
kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
kafkaConsumer.subscribe(Collections.singleton(topic));
while (true) {
calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
}
}
По входным параметрам создается экземпляр «kafka-потребителя». «kafka-потребитель» подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку.
Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки.
Пример сообщения:
{
"subject": {
"subject_type": "send"
},
"body": {
"method": "email",
"recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
"title": "42",
"message": "73"
}
}
subject_type — тип сообщения. Для сервиса нужно значение «send».
method – тип конечного сервиса для отправки. «email» — отправка через почту.
recipients – список получателей.
title – заголовок для сообщения.
message – сообщение.
Обработка всех записей:
void calculate(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
calculate(record);
}
}
Обработка одной записи:
void calculate(ConsumerRecord<String, String> record) {
JSONParser jsonParser = new JSONParser();
Object parsedObject = null;
try {
parsedObject = jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
if (parsedObject instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) parsedObject;
JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
if (SEND.equals(subjectType)) {
JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
calculate(jsonBody);
}
}
}
Распределение сообщений по типу:
void calculate(JSONObject jsonBody) {
String method = jsonBody.get(METHOD).toString();
if (EMAIL_METHOD.equals(method)) {
String recipients = jsonBody.get(RECIPIENTS).toString();
String title = jsonBody.get(TITLE).toString();
String message = jsonBody.get(MESSAGE).toString();
sendEmail(recipients, title, message);
}
}
Отправка в конечную систему:
void sendEmail(String recipients, String title, String message) {
tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}
Отправка сообщений происходит через сервис исполнения задач.
Ожидания завершения отправки не происходит.
Создание «kafka-потребителя»:
static KafkaConsumer<String, String> createKafkaConsumerStringString(
String bootstrapServers,
String clientId,
String groupId
) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(properties);
}
Интерфейс для писем:
interface EmailService {
void send(String recipients, String title, String message);
}
Тест
Для теста понадобиться следующее.
Адрес «kafka-сервера».
Порт для «kafka-сервера».
Имя топика.
Сервис для управления «kafka-сервером». Будет описан ниже.
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
}
}
}
Задаются параметры. Создается сервис для управления «kafka-сервером». «kafka-сервером» стартует. Создается необходимый топик.
Создается «mock» конечного сервиса для отправки сообщений:
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
Создается сам сервис и стартует:
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
Задаются параметры для сообщения:
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
Отправляется сообщение в канал:
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Ожидание:
Thread.sleep(6000);
Проверка, что сообщение дошло до конечного сервиса:
verify(emailService).send(recipients, title, message);
Остановка:
senderService.stop();
Все вместе:
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Thread.sleep(6000);
verify(emailService).send(recipients, title, message);
senderService.stop();
}
}
}
Вспомогательный код:
public class SenderFactory {
public static final String SUBJECT = "subject";
public static final String SUBJECT_TYPE = "subject_type";
public static final String BODY = "body";
public static final String METHOD = "method";
public static final String EMAIL_METHOD = "email";
public static final String RECIPIENTS = "recipients";
public static final String TITLE = "title";
public static final String MESSAGE = "message";
public static final String SEND = "send";
public static String key() {
return UUID.randomUUID().toString();
}
public static String createMessage(String method, String recipients, String title, String message) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> subject = new HashMap<>();
Map<String, Object> body = new HashMap<>();
map.put(SUBJECT, subject);
subject.put(SUBJECT_TYPE, SEND);
map.put(BODY, body);
body.put(METHOD, method);
body.put(RECIPIENTS, recipients);
body.put(TITLE, title);
body.put(MESSAGE, message);
return JSONObject.toJSONString(map);
}
}
Сервис для управления «kafka-сервером»
Основные методы:
void start()
void close()
void createTopic(String topic)
В методе «start» происходит создание сервера и вспомогательных объектов.
Создание «zookeeper» и сохранение его адреса:
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
Создание клиента «zookeeper»:
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Задание свойств для сервера:
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
Создание сервера:
kafkaServer = TestUtils.createServer(config, new MockTime());
Все вместе:
public void start() {
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
kafkaServer = TestUtils.createServer(config, new MockTime());
}
Остановка сервиса:
@Override
public void close() {
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
Создание топика:
public void createTopic(String topic) {
AdminUtils.createTopic(
zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
Заключение
В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ.
Для создания и тестирования сервисов с использованием «kafka» можно обратиться к следующему ресурсу:
kafka-streams-examples
Ссылки и ресурсы
Исходный код
Код для тестирования с «kafka-сервером»
===========
Источник:
habr.com
===========
Похожие новости:
- [Java, API] Что было раньше: код или документация? OpenApi (OAS 3.0) и проблемы кодогенерации на Java
- [Системное администрирование, Серверное администрирование, Управление разработкой, DevOps] Приглашаем на митап «Apache Kafka в вопросах и ответах» 17 ноября в 19:00
- [Apache, Atlassian] Автоматизация аналитики Jira средствами Apache NiFi
- [JavaScript, ReactJS] Все ли вы знаете о React key?
- [Java] Внедрение рекомендаций по структуре кода с использованием ArchUnit (перевод)
- [JavaScript] Namespaces в JavaScript
- [Разработка веб-сайтов, ReactJS] React — Используйте стандартные пропсы для потока данных
- [Информационная безопасность, Программирование, Java, GitHub, Софт] Архитектурные подходы к авторизации в серверных приложениях: Activity-Based Access Control Framework
- [Программирование, Java, Kotlin] Переезд из Java в Kotlin: как забрать коллекции с собой
- [Информационная безопасность, JavaScript, Google Chrome, Браузеры] Google Chrome начнет блокировать JavaScript-редирект по кликам на ссылки
Теги для поиска: #_testirovanie_itsistem (Тестирование IT-систем), #_java, #_apache, #_java, #_kafka, #_apache_kafka, #_junit, #_services, #_testirovanie_itsistem (
Тестирование IT-систем
), #_java, #_apache
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:13
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Введение Существуют различные способы для написания тестов с использованием Apache Kafka. К примеру, можно использовать TestContainers и EmbeddedKafka. Об этом можно почитать, к примеру, вот здесь: Подводные камни тестирования Kafka Streams. Но существует и вариант для написания тестов с использованием KafkaServer. Что будет тестироваться? Предположим, необходимо разработать сервис отправки сообщений по различным каналам: email, telegram и т.п. Пусть имя сервиса будет: SenderService. Сервис должен: слушать заданный канал, выделять из канала нужные ему сообщения, разбирать сообщения и отправлять по нужному каналу для конечной доставки сообщений. Для проверки сервиса необходимо сформировать сообщение для отправки с использованием канала отправки почты и убедиться в том, что сообщение было передано в конечный канал. Конечно, в реальных приложениях тесты будут сложнее. Но для иллюстрации выбранного подхода, такого теста будет достаточно. Сервис и тест реализованы с использованием: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1. Сервис Сервис будет иметь возможность начать работу и остановить свою работу. void start()
void stop() При старте необходимо задать, как минимум, следующие параметры: String bootstrapServers
String senderTopic EmailService emailService bootstrapServers – адрес kafka. senderTopic – топик, из которого будут считываться сообщения. emailService – сервис для конечной отправки сообщений по почте. В реальном сервисе таких конечных сервисов будет столько же сколько и конечных каналов отправки сообщений. Теперь необходим «потребитель», который слушает канал, фильтрует и отправляет сообщения в конечные каналы. Количество таких «потребителей» можно выбирать. Подход для написания «потребителя» описан вот здесь: Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client. Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN); ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN); for (int i = 0; i < senderTasksN; i++) { SenderConsumerLoop senderConsumerLoop = new SenderConsumerLoop( bootstrapServers, senderTopic, "sender", "sender", tasksExecutorService, emailService ); closeables.add(senderConsumerLoop); senderTasksExecutor.submit(senderConsumerLoop); } В цикле создается экземпляр «потребителя», запоминается в коллекции и запускается через сервис запуска задач. При выполнении этого кода «потребители» начинают работать. Сервис ждет их завершения или сигнала для остановки. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (AutoCloseable autoCloseable : closeables) { try { autoCloseable.close(); } catch (Exception e) { e.printStackTrace(); } } senderTasksExecutor.shutdown(); tasksExecutorService.shutdown(); stop(); try { senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } })); При завершении необходимо освободить ресурсы. «Потребитель» «Потребитель» имеет следующие публичные методы: void run()
void close() Основной метод: run. @Override
public void run() { kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId); kafkaConsumer.subscribe(Collections.singleton(topic)); while (true) { calculate(kafkaConsumer.poll(Duration.ofSeconds(1))); } } По входным параметрам создается экземпляр «kafka-потребителя». «kafka-потребитель» подписывается на заданный топик. В бесконечном цикле выбираются записи из топика. И отправляются на обработку. Для иллюстрации json-сообщения будут иметь несколько полей, которые будут задавать и тип сообщения, и данные для отправки. Пример сообщения: {
"subject": { "subject_type": "send" }, "body": { "method": "email", "recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml", "title": "42", "message": "73" } } subject_type — тип сообщения. Для сервиса нужно значение «send». method – тип конечного сервиса для отправки. «email» — отправка через почту. recipients – список получателей. title – заголовок для сообщения. message – сообщение. Обработка всех записей: void calculate(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) { calculate(record); } } Обработка одной записи: void calculate(ConsumerRecord<String, String> record) {
JSONParser jsonParser = new JSONParser(); Object parsedObject = null; try { parsedObject = jsonParser.parse(record.value()); } catch (ParseException e) { e.printStackTrace(); } if (parsedObject instanceof JSONObject) { JSONObject jsonObject = (JSONObject) parsedObject; JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT); String subjectType = jsonSubject.get(SUBJECT_TYPE).toString(); if (SEND.equals(subjectType)) { JSONObject jsonBody = (JSONObject) jsonObject.get(BODY); calculate(jsonBody); } } } Распределение сообщений по типу: void calculate(JSONObject jsonBody) {
String method = jsonBody.get(METHOD).toString(); if (EMAIL_METHOD.equals(method)) { String recipients = jsonBody.get(RECIPIENTS).toString(); String title = jsonBody.get(TITLE).toString(); String message = jsonBody.get(MESSAGE).toString(); sendEmail(recipients, title, message); } } Отправка в конечную систему: void sendEmail(String recipients, String title, String message) {
tasksExecutorService.submit(() -> emailService.send(recipients, title, message)); } Отправка сообщений происходит через сервис исполнения задач. Ожидания завершения отправки не происходит. Создание «kafka-потребителя»: static KafkaConsumer<String, String> createKafkaConsumerStringString(
String bootstrapServers, String clientId, String groupId ) { Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new KafkaConsumer<>(properties); } Интерфейс для писем: interface EmailService {
void send(String recipients, String title, String message); } Тест Для теста понадобиться следующее. Адрес «kafka-сервера». Порт для «kafka-сервера». Имя топика. Сервис для управления «kafka-сервером». Будет описан ниже. public class SenderServiceTest {
@Test void consumeEmail() throws InterruptedException { String brokerHost = "127.0.0.1"; int brokerPort = 29092; String bootstrapServers = brokerHost + ":" + brokerPort; String senderTopic = "sender_data"; try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) { kafkaServerService.start(); kafkaServerService.createTopic(senderTopic); } } } Задаются параметры. Создается сервис для управления «kafka-сервером». «kafka-сервером» стартует. Создается необходимый топик. Создается «mock» конечного сервиса для отправки сообщений: SenderService.EmailService emailService = mock(SenderService.EmailService.class);
Создается сам сервис и стартует: SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start(); Задаются параметры для сообщения: String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42"; String message = "73"; Отправляется сообщение в канал: kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Ожидание: Thread.sleep(6000);
Проверка, что сообщение дошло до конечного сервиса: verify(emailService).send(recipients, title, message);
Остановка: senderService.stop();
Все вместе: public class SenderServiceTest {
@Test void consumeEmail() throws InterruptedException { String brokerHost = "127.0.0.1"; int brokerPort = 29092; String bootstrapServers = brokerHost + ":" + brokerPort; String senderTopic = "sender_data"; try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) { kafkaServerService.start(); kafkaServerService.createTopic(senderTopic); SenderService.EmailService emailService = mock(SenderService.EmailService.class); SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService); senderService.start(); String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml"; String title = "42"; String message = "73"; kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message)); Thread.sleep(6000); verify(emailService).send(recipients, title, message); senderService.stop(); } } } Вспомогательный код: public class SenderFactory {
public static final String SUBJECT = "subject"; public static final String SUBJECT_TYPE = "subject_type"; public static final String BODY = "body"; public static final String METHOD = "method"; public static final String EMAIL_METHOD = "email"; public static final String RECIPIENTS = "recipients"; public static final String TITLE = "title"; public static final String MESSAGE = "message"; public static final String SEND = "send"; public static String key() { return UUID.randomUUID().toString(); } public static String createMessage(String method, String recipients, String title, String message) { Map<String, Object> map = new HashMap<>(); Map<String, Object> subject = new HashMap<>(); Map<String, Object> body = new HashMap<>(); map.put(SUBJECT, subject); subject.put(SUBJECT_TYPE, SEND); map.put(BODY, body); body.put(METHOD, method); body.put(RECIPIENTS, recipients); body.put(TITLE, title); body.put(MESSAGE, message); return JSONObject.toJSONString(map); } } Сервис для управления «kafka-сервером» Основные методы: void start()
void close() void createTopic(String topic) В методе «start» происходит создание сервера и вспомогательных объектов. Создание «zookeeper» и сохранение его адреса: zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port(); Создание клиента «zookeeper»: zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false); Задание свойств для сервера: Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); try { brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); } catch (IOException e) { throw new RuntimeException(e); } brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort); brokerProps.setProperty("offsets.topic.replication.factor", "1"); KafkaConfig config = new KafkaConfig(brokerProps); Создание сервера: kafkaServer = TestUtils.createServer(config, new MockTime());
Все вместе: public void start() {
zkServer = new EmbeddedZookeeper(); String zkConnect = zkHost + ":" + zkServer.port(); zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); zkUtils = ZkUtils.apply(zkClient, false); Properties brokerProps = new Properties(); brokerProps.setProperty("zookeeper.connect", zkConnect); brokerProps.setProperty("broker.id", "0"); try { brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); } catch (IOException e) { throw new RuntimeException(e); } brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort); brokerProps.setProperty("offsets.topic.replication.factor", "1"); KafkaConfig config = new KafkaConfig(brokerProps); kafkaServer = TestUtils.createServer(config, new MockTime()); } Остановка сервиса: @Override
public void close() { kafkaServer.shutdown(); zkClient.close(); zkServer.shutdown(); } Создание топика: public void createTopic(String topic) {
AdminUtils.createTopic( zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); } Заключение В заключении нужно отметить, что приведенный здесь код лишь иллюстрирует выбранный способ. Для создания и тестирования сервисов с использованием «kafka» можно обратиться к следующему ресурсу: kafka-streams-examples Ссылки и ресурсы Исходный код Код для тестирования с «kafka-сервером» =========== Источник: habr.com =========== Похожие новости:
Тестирование IT-систем ), #_java, #_apache |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:13
Часовой пояс: UTC + 5