[Java, Kotlin] Производитель/потребитель на Kafka и Kotlin (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Перевод статьи подготовлен в преддверии старта курса «Backend-разработка на Kotlin»
В этой статье мы поговорим о том, как создать простое приложение на Spring Boot с Kafka и Kotlin.
Введение
Начните с посещения https://start.spring.io и добавьте следующие зависимости:
Groovy
implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
В нашем примере для сборки мы воспользуемся Gradle. Вы вполне можете выбрать Maven.
Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA.
Скачайте Apache Kafka
Загрузите последнюю версию Apache Kafka с их сайта и распакуйте в папку. Я пользуюсь операционной системой Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами по типу «too many lines encountered». Так происходит потому что Kafka добавляет большую структуру папок в свое имя пути. Если эта проблема не будет устранена автоматически, вам придется переименовать структуру папок как-нибудь покороче и запустить приложение из Power Shell.
Чтобы запустить Kafka, воспользуйтесь следующими командами:
Shell
.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties
Эти две команды вы увидите в папке /bin/windows.
Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Zookeeper – это продукт Apache, который предоставляет сервис распределенной конфигурации.
Запуск Spring Boot
Первым шагом создайте в своей IDE класс, который называется KafkaDemoApplication.kt. При создании проекта с сайта Spring, класс будет создан автоматически.
Добавьте следующий код:
Kotlin
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class KafkaDemoApplication
fun main(args: Array<String>) {
runApplication<KafkaDemoApplication>(*args)
}
Производитель
Мы можем отправлять сообщения в топики двумя способами. Их мы рассмотрим ниже.
Мы разработаем класс-контроллер, который нужен для отправки и получения сообщений. Назовем этот класс KafkaController.kt. И добавим следующий метод:
Kotlin
var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"
@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
var sendResult: SendResult<String, String> = lf.get()
return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}
Для отправки сообщений в топик, который называется test_topic, мы используем KafkaTemplate. Он будет возвращать объект ListenableFuture, из которого мы можем получить результат этого действия. Такой подход является самым простым, если вы просто хотите отправлять сообщение в топик.
Второй способ
Следующий способ отправки сообщения в топик Kafka – это использование объекта KafkaProducer. Для этого мы напишем следующий код:
Kotlin
@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
val map = mutableMapOf<String, String>()
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["bootstrap.servers"] = "localhost:9092"
var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
return ResponseEntity.ok(" message sent to " + future.get().topic());
}
И тут нужно сделать небольшое пояснение.
Нам нужно инициализировать объект KafkaProduce с Map, которая будет содержать ключ и значение для сериализации. В нашем примере речь идет о строковом сообщении, поэтому нам нужен только StringSerializer.
В принципе, Serializer – это интерфейс Kafka, который преобразует строки в байты. В Apache Kafka есть и другие сериализаторы, такие как ByteArraySerializer, ByteSerializer, FloatSerializer и др.
Для map мы указываем ключ и значение с StringSerializer.
Kotlin
map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
Следующее значение – это сведения о bootstrap-сервере, необходимые для коммуникации с кластером Kafka.
Kotlin
map["bootstrap.servers"] = "localhost:9092"
Все эти атрибуты нужны, если мы используем KafkaProducer.
Затем нам нужно создать ProducerRecord с именем топика и самим сообщением. Именно это мы и сделаем в следующей строке:
Kotlin
var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
Теперь мы можем отправить наше сообщение в топик с помощью следующего кода:
Kotlin
var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
Эта операция вернет future с именем топика, который используется для отправки сообщения.
Потребитель
Мы посмотрели, как отправлять сообщения в топики. Но нам также нужно слушать входящие сообщения. Чтобы это сделать, нужно создать слушателя, который будет потреблять сообщения.
Давайте создадим класс MessageConsumer.kt и пометим его с помощью Service.
Kotlin
@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
println(" message received from topic : $message");
}
Этот метод можно использовать для прослушивания сообщения с помощью аннотации @KafkaListener и вывода сообщения в консоль, как только оно появляется в топике. Только убедитесь, что вы используете то же имя топика, что и для отправки сообщения.
Исходный код вы можете посмотреть в моем репозитории на GitHub.
Узнать подробнее о курсе «Backend-разработка на Kotlin»
===========
Источник:
habr.com
===========
===========
Автор оригинала: Unni Mana
===========Похожие новости:
- [Apache, Big Data] Распределенное обучение с Apache MXNet и Horovod (перевод)
- [CSS, HTML, JavaScript, Программирование, Разработка игр] Создание браузерных 3d-игр с нуля на чистом html, css и js. Часть 1/2
- [PHP, Symfony] Управление секретами в Symfony (перевод)
- [Виртуализация, Настройка Linux] Сравниваем лучшее программное обеспечение для виртуализации в 2020 году: Hyper-V, KVM, vSphere и XenServer (перевод)
- [JavaScript, Программирование, Разработка веб-сайтов, Учебный процесс в IT] Задачки для фронтенд-тренировки: doodle-place, Apple Podcasts, Site Blocker, парсинг CSV-файлов (перевод)
- [JavaScript, ReactJS, TypeScript] Todolist на React Hooks + TypeScript: от сборки до тестирования
- [Java] Многопоточность. Модель памяти Java (часть 2) (перевод)
- [CSS, JavaScript] Atomizer vs Minimalist Notation (MN)
- [IT-компании, Законодательство в IT, Информационная безопасность] Присяжные в США признали россиянина Евгения Никулина виновным во взломе LinkedIn, Dropbox и Formspring в 2012 году
- [] Выгорание сотрудников: основные принципы борьбы, если вы тимлид
Теги для поиска: #_java, #_kotlin, #_kafka, #_kafka_apache, #_kotlin, #_kotlin_and_spring, #_spring, #_spring_boot, #_otus, #_blog_kompanii_otus._onlajnobrazovanie (
Блог компании OTUS. Онлайн-образование
), #_java, #_kotlin
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:32
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Перевод статьи подготовлен в преддверии старта курса «Backend-разработка на Kotlin» В этой статье мы поговорим о том, как создать простое приложение на Spring Boot с Kafka и Kotlin. Введение Начните с посещения https://start.spring.io и добавьте следующие зависимости: Groovy implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.apache.kafka:kafka-streams") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") implementation("org.springframework.kafka:spring-kafka") В нашем примере для сборки мы воспользуемся Gradle. Вы вполне можете выбрать Maven. Создайте и загрузите проект. Затем импортируйте его в IntelliJ IDEA. Скачайте Apache Kafka Загрузите последнюю версию Apache Kafka с их сайта и распакуйте в папку. Я пользуюсь операционной системой Windows 10. При запуске Kafka вы можете столкнуться с некоторыми проблемами по типу «too many lines encountered». Так происходит потому что Kafka добавляет большую структуру папок в свое имя пути. Если эта проблема не будет устранена автоматически, вам придется переименовать структуру папок как-нибудь покороче и запустить приложение из Power Shell. Чтобы запустить Kafka, воспользуйтесь следующими командами: Shell .\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties Эти две команды вы увидите в папке /bin/windows. Чтобы запустить Kafka, сначала нужно запустить Zookeeper. Zookeeper – это продукт Apache, который предоставляет сервис распределенной конфигурации. Запуск Spring Boot Первым шагом создайте в своей IDE класс, который называется KafkaDemoApplication.kt. При создании проекта с сайта Spring, класс будет создан автоматически. Добавьте следующий код: Kotlin import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication @SpringBootApplication class KafkaDemoApplication fun main(args: Array<String>) { runApplication<KafkaDemoApplication>(*args) } Производитель Мы можем отправлять сообщения в топики двумя способами. Их мы рассмотрим ниже. Мы разработаем класс-контроллер, который нужен для отправки и получения сообщений. Назовем этот класс KafkaController.kt. И добавим следующий метод: Kotlin var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic" @GetMapping("/send") fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> { var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!! var sendResult: SendResult<String, String> = lf.get() return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic") } Для отправки сообщений в топик, который называется test_topic, мы используем KafkaTemplate. Он будет возвращать объект ListenableFuture, из которого мы можем получить результат этого действия. Такой подход является самым простым, если вы просто хотите отправлять сообщение в топик. Второй способ Следующий способ отправки сообщения в топик Kafka – это использование объекта KafkaProducer. Для этого мы напишем следующий код: Kotlin @GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> { var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message) val map = mutableMapOf<String, String>() map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" map["bootstrap.servers"] = "localhost:9092" var producer = KafkaProducer<String, String>(map as Map<String, Any>?) var future:Future<RecordMetadata> = producer?.send(producerRecord)!! return ResponseEntity.ok(" message sent to " + future.get().topic()); } И тут нужно сделать небольшое пояснение. Нам нужно инициализировать объект KafkaProduce с Map, которая будет содержать ключ и значение для сериализации. В нашем примере речь идет о строковом сообщении, поэтому нам нужен только StringSerializer. В принципе, Serializer – это интерфейс Kafka, который преобразует строки в байты. В Apache Kafka есть и другие сериализаторы, такие как ByteArraySerializer, ByteSerializer, FloatSerializer и др. Для map мы указываем ключ и значение с StringSerializer. Kotlin map["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer" Следующее значение – это сведения о bootstrap-сервере, необходимые для коммуникации с кластером Kafka. Kotlin map["bootstrap.servers"] = "localhost:9092"
Все эти атрибуты нужны, если мы используем KafkaProducer. Затем нам нужно создать ProducerRecord с именем топика и самим сообщением. Именно это мы и сделаем в следующей строке: Kotlin var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)
Теперь мы можем отправить наше сообщение в топик с помощью следующего кода: Kotlin var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
Эта операция вернет future с именем топика, который используется для отправки сообщения. Потребитель Мы посмотрели, как отправлять сообщения в топики. Но нам также нужно слушать входящие сообщения. Чтобы это сделать, нужно создать слушателя, который будет потреблять сообщения. Давайте создадим класс MessageConsumer.kt и пометим его с помощью Service. Kotlin @KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit { println(" message received from topic : $message"); } Этот метод можно использовать для прослушивания сообщения с помощью аннотации @KafkaListener и вывода сообщения в консоль, как только оно появляется в топике. Только убедитесь, что вы используете то же имя топика, что и для отправки сообщения. Исходный код вы можете посмотреть в моем репозитории на GitHub. Узнать подробнее о курсе «Backend-разработка на Kotlin» =========== Источник: habr.com =========== =========== Автор оригинала: Unni Mana ===========Похожие новости:
Блог компании OTUS. Онлайн-образование ), #_java, #_kotlin |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:32
Часовой пояс: UTC + 5