[Тестирование веб-сервисов, Kotlin, Gradle, Распределённые системы, Микросервисы] Использование Spring Cloud Stream Binding с брокером сообщений Kafka
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем привет! Меня зовут Виталий, я разработчик в компании Web3Tech. В этом посте я представлю основные концепции и конструкции платформы Spring Cloud Stream для поддержки и работы с брокерами сообщений Kafka, с полным циклом их контекстного unit-тестирования. Мы используем такую схему в своем проекте всероссийского электронного голосования на блокчейн-платформе Waves Enterprise.Являясь частью группы проектов Spring Cloud, Spring Cloud Stream основан на Spring Boot и использует Spring Integration для обеспечения связи с брокерами сообщений. При этом он легко интегрируется с различными брокерами сообщений и требует минимальной конфигурации для создания event-driven или message-driven микросервисов.Конфигурация и зависимостиДля начала нам нужно добавить зависимость spring-cloud-starter-stream-kafka в build.gradle:
dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
В конфигурацию проекта Spring Cloud Stream необходимо включить URL Kafka-брокера, имя очереди (топик) и другие параметры биндинга. Вот пример YAML-конфигурации для сервиса application.yaml:
spring:
application:
name: cloud-stream-binding-kafka-app
cloud:
stream:
kafka:
binder:
brokers: 0.0.0.0:8080
configuration:
auto-offset-reset: latest
bindings:
customChannel: #Channel name
destination: 0.0.0.0:8080 #Destination to which the message is sent (topic)
group: input-group-N
contentType: application/json
consumer:
max-attempts: 1
autoCommitOffset: true
autoCommitOnError: false
Концепция и классыПо сути, мы имеем дело с сервисом, построенным на Spring Cloud Stream, который прослушивает входящую очередь, используя биндинги (SpringCloudStreamBindingKafkaApp.kt):
@EnableBinding(ProducerBinding::class)
@SpringBootApplication
class SpringCloudStreamBindingKafkaApp
fun main(args: Array<String>) {
SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
}
Аннотация @EnableBinding указывает сервису на биндинг как входящего, так и исходящего канала.Здесь необходимо уточнить ряд концепций.Binding — интерфейс, в котором описаны входящие и исходящие каналы.
Binder — имплементация middleware для сообщений.
Channel — представляет канал для передачи сообщений между middleware и приложением.
StreamListeners — методы обработки сообщений в виде бинов (beans), которые будут автоматически вызваны после того, как MessageConverter осуществит сериализацию или десериализацию между событиями в middleware и типами объектов в домене “DTO”.
Message Schema — схемы, используемые для сериализации и десериализации сообщений. Могут быть прочитаны из источника или динамически загружены.ТестированиеЧтобы протестировать сообщение и операции send/receive, нам нужно создать как минимум одного producer и одного consumer. Вот простейший пример того, как это можно сделать в Spring Cloud Stream.Инстанс бина Producer будет отправлять сообщение в топик Kafka, используя биндер (ProducerBinding.kt):
interface ProducerBinding {
@Output(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
Инстанс бина Сonsumer будет слушать топик Kafka и получать сообщения.ConsumerBinding.kt:
interface ConsumerBinding {
companion object {
const val BINDING_TARGET_NAME = "customChannel"
}
@Input(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
Consumer.kt:
@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {
@StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
fun process(
@Payload message: Map<String, Any?>,
@Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
) {
messageService.consume(message)
}
}
Мы создали брокер Kafka с топиком. Для тестирования будем использовать встроенную Kafka, доступную нам с зависимостью spring-kafka-test.Функциональное тестирование с MessageCollectorМы имеем дело с имплементацией биндера, позволяющей взаимодействовать с каналами и получать сообщения. Отправим сообщение в канал ProducerBinding и затем получим его в виде payload ProducerTest.kt:
@SpringBootTest
class ProducerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var messageCollector: MessageCollector
@Test
fun `should produce somePayload to channel`() {
// ARRANGE
val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
// ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
val payload = messageCollector.forChannel(producerBinding.messageChannel())
.poll()
.payload
// ASSERT
val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
assertTrue(request.entries.stream().allMatch { re ->
re.value == payloadAsMap[re.key.toString()]
})
messageCollector.forChannel(producerBinding.messageChannel()).clear()
}
}
Тестирование с брокером Embedded KafkaИспользуем аннотацию @ClassRule для создания брокера. Так мы сможем поднять сервера Kafka и Zookeeper на случайном порте перед началом теста и выключить их, когда тест завершится. Это избавляет нас от необходимости в рабочем инстансе Kafka и Zookeper на всё время проведения теста (ConsumerTest.kt):
@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var objectMapper: ObjectMapper
@MockBean
lateinit var messageService: MessageService
companion object {
@ClassRule @JvmField
var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
}
@Test
fun `should consume via txConsumer process`() {
// ACT
val request = mapOf(1 to "foo", 2 to "bar")
producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
.setHeader("someHeaderName", "someHeaderValue")
.build())
// ASSERT
val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
runBlocking {
delay(20)
verify(messageService).consume(requestAsMap)
}
}
}
ЗаключениеВ этом посте я продемонстрировал возможности Spring Cloud Stream и использования его с Kafka. Spring Cloud Stream предлагает удобный интерфейс с упрощенными нюансами настройки брокера, быстро внедряется, стабильно работает и поддерживает современные популярные брокеры, такие как Kafka. По итогам я привел ряд примеров с unit-тестированием на основе EmbeddedKafkaRule с использованием MessageCollector.Все исходники можно найти на Github. Спасибо за прочтение!
===========
Источник:
habr.com
===========
Похожие новости:
- [] DIY Mobile Day в формате Live coding
- [Информационная безопасность, Тестирование веб-сервисов] TWAPT — пентестим по-белому в домашних условиях
- [Тестирование веб-сервисов] Как не надо заводить баги. Часто встречающиеся ошибки
- [Разработка мобильных приложений, Разработка под Android, Kotlin, Дизайн мобильных приложений] Реализация Undo в Snackbar на Jetpack Compose (перевод)
- [Java, Kotlin] Реактивный масштабируемый чат на Kotlin + Spring + WebSockets
- [Kotlin, Карьера в IT-индустрии, Конференции, Kubernetes] 22 апреля — новый QIWI Server Party
- [Облачные вычисления, Разработка под Linux, Распределённые системы, Процессоры] Сравнение криптографической производительности популярных ARM-процессоров для DYI и Edge-устройств, плюс Xeon E-2224
- [Программирование, TDD, Разработка под Android, Kotlin] Пишем unit тесты так, чтобы не было мучительно больно
- [IT-инфраструктура, Виртуализация, IT-компании, Микросервисы, Kubernetes] Угрожает ли микросервисная (контейнеризация) архитектура светлому будущему Published Apps (Citrix & Co.)
- [Программирование, Тестирование веб-сервисов, DevOps] Вы неправильно используете docker-compose (перевод)
Теги для поиска: #_testirovanie_vebservisov (Тестирование веб-сервисов), #_kotlin, #_gradle, #_raspredelennye_sistemy (Распределённые системы), #_mikroservisy (Микросервисы), #_spring_cloud, #_kafka_streams, #_kafka_binder, #_kotlin, #_gradle, #_testirovanie_vebservisov (
Тестирование веб-сервисов
), #_kotlin, #_gradle, #_raspredelennye_sistemy (
Распределённые системы
), #_mikroservisy (
Микросервисы
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 09:06
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем привет! Меня зовут Виталий, я разработчик в компании Web3Tech. В этом посте я представлю основные концепции и конструкции платформы Spring Cloud Stream для поддержки и работы с брокерами сообщений Kafka, с полным циклом их контекстного unit-тестирования. Мы используем такую схему в своем проекте всероссийского электронного голосования на блокчейн-платформе Waves Enterprise.Являясь частью группы проектов Spring Cloud, Spring Cloud Stream основан на Spring Boot и использует Spring Integration для обеспечения связи с брокерами сообщений. При этом он легко интегрируется с различными брокерами сообщений и требует минимальной конфигурации для создания event-driven или message-driven микросервисов.Конфигурация и зависимостиДля начала нам нужно добавить зависимость spring-cloud-starter-stream-kafka в build.gradle: dependencies {
implementation(kotlin("stdlib")) implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.springframework.cloud:spring-cloud-stream-test-support") testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion") } spring:
application: name: cloud-stream-binding-kafka-app cloud: stream: kafka: binder: brokers: 0.0.0.0:8080 configuration: auto-offset-reset: latest bindings: customChannel: #Channel name destination: 0.0.0.0:8080 #Destination to which the message is sent (topic) group: input-group-N contentType: application/json consumer: max-attempts: 1 autoCommitOffset: true autoCommitOnError: false @EnableBinding(ProducerBinding::class)
@SpringBootApplication class SpringCloudStreamBindingKafkaApp fun main(args: Array<String>) { SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args) } Binder — имплементация middleware для сообщений. Channel — представляет канал для передачи сообщений между middleware и приложением. StreamListeners — методы обработки сообщений в виде бинов (beans), которые будут автоматически вызваны после того, как MessageConverter осуществит сериализацию или десериализацию между событиями в middleware и типами объектов в домене “DTO”. Message Schema — схемы, используемые для сериализации и десериализации сообщений. Могут быть прочитаны из источника или динамически загружены.ТестированиеЧтобы протестировать сообщение и операции send/receive, нам нужно создать как минимум одного producer и одного consumer. Вот простейший пример того, как это можно сделать в Spring Cloud Stream.Инстанс бина Producer будет отправлять сообщение в топик Kafka, используя биндер (ProducerBinding.kt): interface ProducerBinding {
@Output(BINDING_TARGET_NAME) fun messageChannel(): MessageChannel } interface ConsumerBinding {
companion object { const val BINDING_TARGET_NAME = "customChannel" } @Input(BINDING_TARGET_NAME) fun messageChannel(): MessageChannel } @EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) { @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME) fun process( @Payload message: Map<String, Any?>, @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int? ) { messageService.consume(message) } } @SpringBootTest
class ProducerTest { @Autowired lateinit var producerBinding: ProducerBinding @Autowired lateinit var messageCollector: MessageCollector @Test fun `should produce somePayload to channel`() { // ARRANGE val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101) // ACT producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build()) val payload = messageCollector.forChannel(producerBinding.messageChannel()) .poll() .payload // ASSERT val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java) assertTrue(request.entries.stream().allMatch { re -> re.value == payloadAsMap[re.key.toString()] }) messageCollector.forChannel(producerBinding.messageChannel()).clear() } } @SpringBootTest
@ActiveProfiles("test") @EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class]) @EnableBinding(ProducerBinding::class) class ConsumerTest { @Autowired lateinit var producerBinding: ProducerBinding @Autowired lateinit var objectMapper: ObjectMapper @MockBean lateinit var messageService: MessageService companion object { @ClassRule @JvmField var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic") } @Test fun `should consume via txConsumer process`() { // ACT val request = mapOf(1 to "foo", 2 to "bar") producerBinding.messageChannel().send(MessageBuilder.withPayload(request) .setHeader("someHeaderName", "someHeaderValue") .build()) // ASSERT val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request)) runBlocking { delay(20) verify(messageService).consume(requestAsMap) } } } =========== Источник: habr.com =========== Похожие новости:
Тестирование веб-сервисов ), #_kotlin, #_gradle, #_raspredelennye_sistemy ( Распределённые системы ), #_mikroservisy ( Микросервисы ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 09:06
Часовой пояс: UTC + 5