[.NET] First touch of Kafka
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике.И так приступим!Единственный брокер Kafka и необходимый для его работы ZooKeeperя буду запускать в Docker.Сперва создам отдельную сеть kafkanet
docker network create kafkanet
Запуск контейнера с ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Запуск контейнера с Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafkaДалее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потреблениеДля этого сценария подключусь к контейнеру kafka
docker exec -it kafka bash
Создам топик demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
Выведу список всех топиков
/bin/kafka-topics --list --zookeeper zookeeper:2181
И выведу описание созданного топика
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
Сгенерирую несколько сообщений
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
И после прочитаю эти сообщения
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting.В проект KafkaProducer добавлю класс KafkaProducerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaProducer
{
public class KafkaProducerService : IHostedService
{
private readonly ILogger<KafkaProducerService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerService(ILogger<KafkaProducerService> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < 5; i++)
{
var value = $"Event N {i}";
_logger.LogInformation($"Sending >> {value}");
await _producer.ProduceAsync(
"demo-topic",
new Message<Null, string> { Value = value },
cancellationToken);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
_logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
return Task.CompletedTask;
}
}
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaProducerService>());
}
}
В проект KafkaConsumer добавлю класс KafkaConsumerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
public class KafkaConsumerService : IHostedService
{
private readonly ILogger<KafkaConsumerService> _logger;
private readonly IConsumer<Ignore, string> _consumer;
public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "demo-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(config).Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_consumer.Subscribe("demo-topic");
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(cancellationToken);
_logger.LogInformation($"Received >> {consumeResult.Message.Value}");
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_consumer?.Dispose();
_logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
return Task.CompletedTask;
}
}
}
Изменю файл Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaConsumerService>());
}
}
Результат работы приложений (ссылка на репозиторий)
===========
Источник:
habr.com
===========
Похожие новости:
- [Python, .NET, История IT] Языку программирования Python исполнилось 30 лет
- [Программирование, .NET, ASP, C#] Реализуем глобальную обработку исключений в ASP.NET Core приложении (перевод)
- [.NET] Как изменить формат данных JSON на Snake Case в ASP.NET Core Web API
- [.NET, PowerShell, Visual Studio, C#, F#] Работаем с notebook в VS Code с помощью расширения «dotnet interactive»
- [.NET, IT-инфраструктура, C#, DevOps] ProcInsp — веб-диспетчер задач для Windows
- [.NET, C#, Разработка под Linux, Разработка под Windows] Путешествие в unmanaged code: туда и обратно
- [.NET, C#] Делаем фильтры «как в экселе» на ASP.NET Core
- [Информационная безопасность, Программирование, .NET, C#, Разработка под Windows] Как следить (наблюдать) за компьютером. Часть 1 — делаем скриншоты пользователей
- [Kotlin, Конференции] Серия вебинаров по серверной разработке на Kotlin. Продолжение
- [Программирование, .NET, Разработка под MacOS, Разработка под Windows] От WPF к Авалонии
Теги для поиска: #_.net, #_.net, #_kafka, #_.net
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:17
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Прежде чем начать я бы хотел отметить, что это всего лишь небольшой туториал по быстрому старту для тех кто, как и я, ни разу не использовал Kafka на практике.И так приступим!Единственный брокер Kafka и необходимый для его работы ZooKeeperя буду запускать в Docker.Сперва создам отдельную сеть kafkanet docker network create kafkanet
Запуск контейнера с ZooKeeper docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Запуск контейнера с Kafka docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
Для того чтобы убедиться в отсутствии ошибок, можно вывести лог docker logs kafkaДалее проверю функционирование брокера Kafka, выполнив простые операции, включающие создание тестовой темы, генерацию сообщений и их потреблениеДля этого сценария подключусь к контейнеру kafka docker exec -it kafka bash
Создам топик demo-topic /bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-topics --list --zookeeper zookeeper:2181
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
Сгенерирую несколько сообщений /bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
Далее я создам два небольших .NET приложения: KafkaProducer, которое будет генерировать сообщения, и KafkaConsumer, которое будет потреблять сообщения. Для реализации мне понадобятся пакеты Confluent.Kafka и Microsoft.Extensions.Hosting.В проект KafkaProducer добавлю класс KafkaProducerService using Confluent.Kafka;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; namespace KafkaProducer { public class KafkaProducerService : IHostedService { private readonly ILogger<KafkaProducerService> _logger; private readonly IProducer<Null, string> _producer; public KafkaProducerService(ILogger<KafkaProducerService> logger) { _logger = logger; var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; _producer = new ProducerBuilder<Null, string>(config).Build(); } public async Task StartAsync(CancellationToken cancellationToken) { for (var i = 0; i < 5; i++) { var value = $"Event N {i}"; _logger.LogInformation($"Sending >> {value}"); await _producer.ProduceAsync( "demo-topic", new Message<Null, string> { Value = value }, cancellationToken); } } public Task StopAsync(CancellationToken cancellationToken) { _producer?.Dispose(); _logger.LogInformation($"{nameof(KafkaProducerService)} stopped"); return Task.CompletedTask; } } } using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using System; namespace KafkaProducer { class Program { static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); Console.ReadKey(); } private static IHostBuilder CreateHostBuilder(string[] args) => Host .CreateDefaultBuilder(args) .ConfigureServices((context, collection) => collection.AddHostedService<KafkaProducerService>()); } } using Confluent.Kafka;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; namespace KafkaConsumer { public class KafkaConsumerService : IHostedService { private readonly ILogger<KafkaConsumerService> _logger; private readonly IConsumer<Ignore, string> _consumer; public KafkaConsumerService(ILogger<KafkaConsumerService> logger) { _logger = logger; var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "demo-group", AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<Ignore, string>(config).Build(); } public Task StartAsync(CancellationToken cancellationToken) { _consumer.Subscribe("demo-topic"); while (!cancellationToken.IsCancellationRequested) { var consumeResult = _consumer.Consume(cancellationToken); _logger.LogInformation($"Received >> {consumeResult.Message.Value}"); } return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _consumer?.Dispose(); _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped"); return Task.CompletedTask; } } } using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using System; namespace KafkaConsumer { class Program { static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); Console.ReadKey(); } private static IHostBuilder CreateHostBuilder(string[] args) => Host .CreateDefaultBuilder(args) .ConfigureServices((context, collection) => collection.AddHostedService<KafkaConsumerService>()); } } =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 19:17
Часовой пояс: UTC + 5