[Java, Apache] Как на самом деле работает auto-commit в Kafka и можем ли мы на него расчитывать?
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
В этой статье я хочу чуть подробнее раскрыть как же устроен механизм auto-commit у слушателей в библиотеке kafka-clients (рассмотрим версию 2.6.0)В документации мы можем найти следующую формулировку описывающую работу auto-commit:
Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.
В java docs к KafkaConsumer в свою очередь присутствует следующее описание:
The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).
Из этих формулировок может сложиться заблуждение о том, что неблокирующий автоматический коммит смещения (offset) происходит в фоновом режиме и не совсем понятно как он связан с процессом получения сообщений конкретным слушателем (consumer) и самое главное какие гарантии доставки мы при этом имеем?Давайте подробнее разберем механизм получения сообщений слушателем с настройкой enable.auto.commit = true на примере реализации класса KafkaConsumer из библиотеки org.apache.kafka:kafka-clients:2.6.0 Для этого рассмотрим пример приведённый в java doc KafkaConsumer:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Как же происходит auto-commit в данном случае? Разгадку стоит искать в самом методе для получения новых сообщений
consumer.poll(Duration.ofMillis(100));
Но сперва сделаем шаг назад. При инициализации класса KafkaConsumer параметры относящиеся к auto-commit такие как enable.auto.commit и auto.commit.interval.ms предаются в экземпляр класса ConsumerCoordinator одно из назначений которого как раз и состоит в том, чтобы реализовать механизм auto-commit.Обратим внимание на метод maybeAutoCommitOffsetsAsync из этого класса
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
При установленном свойстве enable.auto.commit = true и в случае если установленный в параметре auto.commit.interval.ms интервал времени с момента последнего коммита истек именно тут происходит коммит последнего смещения, а в случае неудачи поток выполнения не прерывается, а происходит просто логирование ошибки (метод doAutoCommitOffsetsAsync)
private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
Теперь вернемся назад в метод poll класса KafkaConsumer. Во время выполнения метода перед получением новых данных из топиков происходит вызов метода updateAssignmentMetadataIfNeeded, который в свою очередь обращается к методу poll класса ConsumerCoordinator, в самом конце, которого вызывается уже известный нам maybeAutoCommitOffsetsAsyncТаким образом собирая нашу матрешку получаем следующую последовательность действий при получении новых сообщений в методе poll класса KafkaConsumer:
- Слушатель сохраняет предыдущий offset
- Далее присходит получение новых сообщений из топиков с которыми связан данный слушатель.
Учитывая что слушатель KafkaConsumer не является потокобезопасным и выполняется одним потоком можем заключить, что операции выполняются строго последовательно. Стоит отметить что п.1 выполняется если enable.auto.commit = true и с момента прошлого коммита прошло больше времени чем указано в параметре auto.commit.interval.ms. Т.е. если метод poll() у нашего слушателя вызывается каждые 3 секунды, а значение auto.commit.interval.ms=6000, то смещение будет сохраняться каждый второй вызов.
А что же с гарантиями доставки? При такой реализации мы имеем вполне честный “at least once delivery”, что было бы затруднительно при сохранении смещений отдельным потоком.
===========
Источник:
habr.com
===========
Похожие новости:
- [Java] 9 лучших практик для обработки исключений в Java (перевод)
- [JavaScript, Java, Карьера в IT-индустрии] Битва Java-разработчиков Tech Monsters Night от «М.Видео-Эльдорадо»
- [Java, Разработка мобильных приложений, Разработка игр, Дизайн игр, Софт] 12 Tech Trends Every Java Developer Must Learn To Win The Game In 2021
- [Разработка веб-сайтов, JavaScript, TypeScript] Практическое руководство по TypeScript для разработчиков (перевод)
- [JavaScript, Интерфейсы, Отладка] Трасси… что? Доклад Яндекса
- [Тестирование IT-систем, JavaScript, Тестирование веб-сервисов] Cypress и его место в нашей тестовой пирамиде
- [Java, Алгоритмы] Алгоритм нахождения 1000 ферзей на шахматной доске
- [Программирование, Java] Отправка электронных писем с помощью Spring (перевод)
- [JavaScript, Google Chrome, HTML] Швейцарский нож отладки JavaScript
- [Разработка веб-сайтов, JavaScript, TypeScript] TypeScript: Раскладываем tsconfig по полочкам. Часть 2
Теги для поиска: #_java, #_apache, #_kafka, #_consumer, #_commit, #_java, #_java, #_apache
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:08
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
В этой статье я хочу чуть подробнее раскрыть как же устроен механизм auto-commit у слушателей в библиотеке kafka-clients (рассмотрим версию 2.6.0)В документации мы можем найти следующую формулировку описывающую работу auto-commit: Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.
The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.poll(Duration.ofMillis(100));
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) { nextAutoCommitTimer.update(now); if (nextAutoCommitTimer.isExpired()) { nextAutoCommitTimer.reset(autoCommitIntervalMs); doAutoCommitOffsetsAsync(); } } } private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> { if (exception != null) { if (exception instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets, exception); nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs); } else { log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); }
А что же с гарантиями доставки? При такой реализации мы имеем вполне честный “at least once delivery”, что было бы затруднительно при сохранении смещений отдельным потоком. =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:08
Часовой пояс: UTC + 5