[.NET] First touch of Kafka

Автор Сообщение
news_bot ®

Стаж: 6 лет 7 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
23-Фев-2021 16:30

Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике.И так приступим!Единственный брокер Kafka и необходимый для его работы ZooKeeperя буду запускать в Docker.Сперва создам отдельную сеть kafkanet
docker network create kafkanet

Запуск контейнера с ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper

Запуск контейнера с Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka

Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafkaДалее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потреблениеДля этого сценария подключусь к контейнеру kafka
docker exec -it kafka bash

Создам топик demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
Выведу список всех топиков
/bin/kafka-topics --list --zookeeper zookeeper:2181
И выведу описание созданного топика
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092

Сгенерирую несколько сообщений
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
И после прочитаю эти сообщения
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092

Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting.В проект KafkaProducer добавлю класс KafkaProducerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaProducer
{
    public class KafkaProducerService : IHostedService
    {
        private readonly ILogger<KafkaProducerService> _logger;
        private readonly IProducer<Null, string> _producer;
        public KafkaProducerService(ILogger<KafkaProducerService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            for (var i = 0; i < 5; i++)
            {
                var value = $"Event N {i}";
                _logger.LogInformation($"Sending >> {value}");
                await _producer.ProduceAsync(
                    "demo-topic",
                    new Message<Null, string> { Value = value },
                    cancellationToken);
            }
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _producer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }
        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaProducerService>());
    }
}
В проект KafkaConsumer добавлю класс KafkaConsumerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
    public class KafkaConsumerService : IHostedService
    {
        private readonly ILogger<KafkaConsumerService> _logger;
        private readonly IConsumer<Ignore, string> _consumer;
        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "demo-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("demo-topic");
            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(cancellationToken);
                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");
            }
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _consumer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }
        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaConsumerService>());
    }
}
Результат работы приложений (ссылка на репозиторий)
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_.net, #_.net, #_kafka, #_.net
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 28-Сен 20:26
Часовой пояс: UTC + 5