[Java, Хранение данных] Как мы Schema Registry для Kafka настраивали, и что могло пойти не так…
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем привет.В статье я опишу, как мы настраивали реестр схем данных для того, чтобы использовать его для сериализации и десериализации сообщений Kafka.Спойлер: на данный момент реестр схем данных настроен и используется в боевой системе, каких-то проблем, связанных с SR, замечено не было.Исходные данные (небольшое оправдание)Разработчикам не всегда выпадает возможность выбирать от и до инструменты, которые следует использовать при реализации той или иной задачи. Например, если вы приходите в команду, где уже начали реализовывать что-то на чем-то, то полностью изменить направление может быть неподъемной задачей. Таким образом, примем за данность, что используется Kafka для асинхронного взаимодействия сервисов - передачи сообщений по событиям.Что? Почему? Зачем?А вот то, что мы выбрали осознанно, так это использование Apache Avro как систему сериализации данных. Почему мы выбрали Apache Avro?Если кратко, то это:
- компактность представления бинарного формата данных (а также есть возможность кодировки в JSON);
- поддержка “логических” типов данных: BigDecimal, дата, дата/время в нативном виде;
- версионирование моделей;
Этих пунктов хватило, чтобы выбрать Avro как схему данных. А версию взяли 1.8.2, которая вышла в мае 2017 года.Последняя релизная версия Avro - 1.10.1 от декабря 2020-го. В ней решены некоторые проблемы, которые нам пришлось чинить костылями, но взять ее мы не могли по определенным причинам (так как в одном из сервисов для сериализации ответа внешней системе используется схема версии 1.8.2 и поднятие версии может привести к нарушению этого взаимодействия)До версии 1.9 в Avro используется библиотека Joda Time для обработки логических типов, связанных с датой и временем. А начиная с версии 1.9 для этого используются нативные Java 8 библиотеки.Перед непосредственно разработкой функциональности мы все-таки попробовали новую версию Avro, но испытали проблемы и конверсиями дат (например, сериализатор не смог корректно обработать LocalDateTime). Точных примеров я не вспомню, но нам нужно было очень аккуратно передавать время в разных временных зонах, а это не самая простая часть для конвертации.Несомненно, можно найти еще преимущества и недостатки этой схемы данных, но мы остановились на ней.Итак, с системой сериализации определились.Далее речь пойдет о реестре схем данных.Реестр схем данных, или The Confluent Schema Registry for Kafka. Благодаря SR можно обеспечить совместимость схем данных между продюсером и консьюмером Kafka. И, чего нам очень хотелось, SR поможет не потерять сообщения из-за ошибок сериализации и десериализации во время эволюции схемы данных.В процессе настройки и при тестовых прогонах мы выявили некоторые, скажем так, особенности использования SR, о которых я хочу рассказать вам и попытаюсь собрать все неувязки в одном месте.Настройка деплоя моделей Прежде всего, модели нужно описать в JSON-формате. Мы создали репозиторий и описали там нужные нам модели. Описывать модели в JSON просто и понятно, например:
{
"type": "record",
"name": "SomeMessage",
"namespace": "com.trthhrts",
"fields": [
{
"name": "title",
"type": "string"
}
]
}
Если хотим nullable-поле, то добавляем в тип возможность null:
{
"type": "record",
"name": "AnotherMessage",
"namespace": "com.trthhrts",
"fields": [
{
"name": "messageDate",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
Модели описали, теперь воспользуемся maven-плагином для взаимодействия с SR:
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>6.0.1</version>
<configuration>
<schemaRegistryUrls>
<param>${SCHEMA_REGISTRY_URL}</param>
</schemaRegistryUrls>
<outputDirectory>${project.build}/avro</outputDirectory>
<userInfoConfig />
<subjects>
<com.trthhrts.SomeMessage>src/SomeMessage.avsc</com.trthhrts.SomeMessage>
</subjects>
<schemaTypes>
<com.trthhrts.SomeMessage>AVRO</com.trthhrts.SomeMessage>
</schemaTypes>
</configuration>
</plugin>
Далее смотрим здесь описание возможностей плагина, и как мы можем управлять реестром схем:
- проверить (validate) на корректность локальную схему;
- проверить изменение схемы на совместимость (test-compatibility);
- скачать схему с SR (download);
- зарегистрировать схему (register).
Мы настроили в Gitlab CI stage с деплоем схем так (ручной запуск и только на master-ветке):Примечание: ручной запуск, потому что в репозитории находятся не только модели Avro, но, например, тесты моделей, вспомогательные библиотеки для работы с Kafka. Смысла регистрировать новые версии схем, если в них не было изменений - нет.
register_schemas:
stage: deploy
script:
- mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B
- mvn schema-registry:register $MAVEN_CLI_OPTS -B
when: manual
only:
- master
Возможно, стоит добавить фазу validate в самое начало скрипта:
register_schemas:
stage: deploy
before_script:
- *scm_connect
script:
- mvn schema-registry:validate $MAVEN_CLI_OPTS -B
- mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B
- mvn schema-registry:register $MAVEN_CLI_OPTS -B
when: manual
only:
- master
Итак, после деплоя наши схемы успешно хранятся в Kafka. Стоп, скажете вы, в смысле в Kafka? А как же SR?! Ну, SR живет отдельно от Kafka брокеров. Это дополнительный компонент, который может быть настроен на любом Kafka кластере и который использует Kafka для хранения в том числе и схем. И, SR предоставляет пользователю REST API для взаимодействия со схемами данных.В специальном топике Kafka (<kafkastore.topic>, по умолчанию _schemas), который имеет 1 партицию и используется как WAL. И все данные, которыми оперирует SR, добавляются как сообщения в этот лог.Упрощенно взаимодействие с SR выглядит так:
Настройка использования Schema RegistryЧтобы использовать SR, достаточно добавить свойство с URL подключения к SR и назначить в качестве сериализатора и десериализатора соответствующие классы: KafkaAvroSerializer и KafkaAvroDeserializer
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
И для консьюмера, например, так:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaConfigs(), new StringDeserializer(), deserializer()));
factory.setErrorHandler(errorHandler());
return factory;
}
private KafkaAvroDeserializer deserializer() {
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
deserializer.configure(kafkaConfigs(), false);
return deserializer;
}
НО...Есть несколько нюансов, которые следует учесть перед тестом и тем более перед деплоем на бой.Стратегия наименования сабжектов (subjects)Если не кратко, то: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy А если кратко, то существует 3 стратегии:
- TopicNameStrategy - название сабжекта основывается на названии топика (стратегия по умолчанию!);
- RecordNameStrategy - название сабжекта основывается на названии модели данных (как способ логически объединить в группу похожие события, которые, однако, могут иметь различную структуру данных, но передавать в рамках одного топика);
- TopicRecordNameStrategy - название сабжекта основывается и на названии топика, и на названии модели данных (опять же как способ логической группировки событий с различной структурой данных);
Для себя мы выбрали стратегию наименования по названию модели данных:
props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
Выбрали такой вариант потому что:
- топики для тестовых стендов и боевого названы по разному, а значит, выбери мы другую стратегию, в реестре схем данных копилось бы много схем для каждого топика, это привело бы к дополнительным сложностям при анализе работы и вообще поиску багов;
- возможность на разных топиках протестировать именно те схемы, которые будут использоваться в боевом окружении.
Имейте это ввиду, если тоже зададитесь вопросом "А какую стратегию выбрать?".Аналогично, можно задать стратегию наименования схемы для ключа (key) сообщения, но мы не используем ключ для сообщений (гарантируя очередность сообщений тем, что у нас задана только 1 партиция на брокерах), поэтому и задавать стратегию для ключа смысла нет. auto.register.schemasСвойство, которое отвечает за то, чтобы клиентские приложения автоматически регистрировали новые схемы в SR. Для dev-окружения это может быть полезно, но для боевого сервера все-таки лучше отключить. И да, по умолчанию это свойство активировано, имейте ввиду.Ну а мы продюсерам это свойство отключили на первых этапах внедрения SR (еще в тестовом режиме):
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
use.latest.versionПрименимо, только если предыдущее свойство (auto.register.schemas) выставлено в false. Если use.latest.version установлено в true, это говорит SR брать последнюю версию в сабжекте для сериализации, а не ту, которая идет вместе с данными клиента. По умолчанию false. Мы обнаружили, что сериализатор той версии, которую мы используем - 5.3.0, не задействует это свойство. Тип совместимости схемВ SR доступно несколько типов совместимости схем. Подробно о них написано, например, в документации Confluent.По умолчанию используется тип BACKWARD, который разрешает удалять поля из модели данных и добавлять optional-поля (те, которые могут быть null). Сравнение схемы идет только с последней версией, а при обновлении модели в первую очередь нужно обновлять консьюмеры, чтобы не получить ошибку десериализации (например, удалили поле, обновили продюсер, который послал сообщение без этого поля, а консьюмер будет его ожидать и свалится с ошибкой десериализации).Нас устроил тип по умолчанию, не меняли.specific.avro.readerМы захотели использовать типы BigDecimal и Timestamp в моделях. Чтобы такие типы корректно сериализовать и десериализовать, следует добавить это свойство:
put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
А так же добавить в класс SpecificData при запуске приложения нужные конверторы, например, так:
static {
SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
}
После этого, если ваши модели данных наследуются от SpecificRecordBase (что по умолчанию при типе записи record) - данные будут корректно сериализовываться и десериализовываться.Вот пример описания модели с такими логическими типами:
{
"type": "record",
"name": "SomePaymentInfoModel",
"namespace": "com.trthhrts",
"fields": [
{
"name": "paymentDate",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "amount",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 9,
"scale": 2
}
],
"default": null
}
]
}
КэшированиеВ KafkaAvroSerializer и KafkaAvroDeserializer используется локальное кэширование, а значит клиенты не будут каждый раз запрашивать схему у SR, только в случае отсутствия нужной схемы нужной версии в локальном кэше.Для нас это означало, что сервисы не попадут в постоянную зависимость на доступность SR и отказ SR далеко не всегда будет означать невозможность сериализации и десериализации сообщений Kafka - по крайней мере, пока жив локальный кэш в приложении.И все?Нет.В процессе тестирования и разработки мы столкнулись с не совсем очевидными вещами, о которых я расскажу, возможно, это сэкономит вам времени.Joda TimeAvro 1.8.2 использует Joda Time библиотеку для работы с полями дат и времени. А мы пишем на Java 8 и используем нативные классы для работы с датами.В процессе разработки был составлен класс утилитных методов для конвертаций дат. Мы нашли это наименьшей головной болью при передаче дат с часовым поясом с помощью Avro по Kafka.Вот так мы, например, конвертируем даты перед отправкой:
convertDate(model.getBeginDate()).ifPresent(builder::setBeginDate)
convertLocalDateToJodaDateTime(model.getPaymentDate()).ifPresent(builder::setPaymentDate);
А вот так, например, принимаем:
convertToLocalDateTime(message.getLastUpdate()).ifPresent(model::setLastUpdate);
Привожу класс целиком, собрали методы из разных уголков вселенной
/**
* Класс с утилитными методам для конвертации типов
*/
public final class MessageUtils {
/** Кол-во наносекунд в миллисекунде */
private static final int NANO_SECONDS_IN_MILLISECOND = 1_000_000;
/** Кол-во миллисекунд в секунде */
private static final int MILLISECONDS_IN_SECONDS = 1000;
/**
* Запрещаем создание
*/
private MessageUtils() {
}
/**
* Конвертирует Joda {@link DateTime} в Java 8 {@link ZonedDateTime}
* @param dateTime дата в формате Joda {@link DateTime}
* @return {@link ZonedDateTime}
*/
public static Optional<ZonedDateTime> convertToZonedDateTime(DateTime dateTime) {
return Optional.ofNullable(dateTime).map(dt -> {
DateTime localDateTime = dateTime.withZone(DateTimeZone.getDefault());
return ZonedDateTime.ofLocal(
LocalDateTime.of(
localDateTime.getYear(),
localDateTime.getMonthOfYear(),
localDateTime.getDayOfMonth(),
localDateTime.getHourOfDay(),
localDateTime.getMinuteOfHour(),
localDateTime.getSecondOfMinute(),
localDateTime.getMillisOfSecond() * NANO_SECONDS_IN_MILLISECOND),
ZoneId.of(localDateTime.getZone().getID(), ZoneId.SHORT_IDS),
ZoneOffset.ofTotalSeconds(localDateTime.getZone().getOffset(localDateTime) / MILLISECONDS_IN_SECONDS));
});
}
/**
* Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDateTime}
* @param dateTime дата в формате Joda {@link DateTime}
* @return {@link LocalDateTime}
*/
public static Optional<LocalDateTime> convertToLocalDateTime(DateTime dateTime) {
return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDateTime);
}
/**
* Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDate}
* @param dateTime дата в формате Joda {@link DateTime}
* @return {@link LocalDate}
*/
public static Optional<LocalDate> convertToLocalDate(DateTime dateTime) {
return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDate);
}
/**
* Конвертирует {@link LocalDate} в {@link org.joda.time.LocalDate}
* @param ld {@link LocalDate}
* @return {@link org.joda.time.LocalDate}
*/
public static Optional<org.joda.time.LocalDate> convertLocalDateToJodaLocalDate(LocalDate ld) {
return Optional.ofNullable(ld).map(date -> new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()));
}
/**
* Конвертирует {@link org.joda.time.LocalDate} в {@link LocalDate}
* @param ld {@link org.joda.time.LocalDate}
* @return {@link LocalDate}
*/
public static Optional<LocalDate> convertJodaLocalDateToLocalDate(org.joda.time.LocalDate ld) {
return Optional.ofNullable(ld).map(date -> LocalDate.of(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth()));
}
/**
* Конвертирует {@link LocalDateTime} в Joda {@link DateTime}
* @param ldt дата в формате {@link LocalDateTime}
* @return дата в формате Joda {@link DateTime}
*/
public static Optional<DateTime> convertLocalDateTimeToJodaDateTime(LocalDateTime ldt) {
return Optional.ofNullable(ldt)
.map(date -> new org.joda.time.LocalDateTime(date.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).toDateTime());
}
/**
* Конвертирует {@link LocalDate} в Joda {@link DateTime}
* @param ld дата в формате {@link LocalDate}
* @return дата в формате Joda {@link DateTime}
*/
public static Optional<DateTime> convertLocalDateToJodaDateTime(LocalDate ld) {
return Optional.ofNullable(ld)
.map(date ->
new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()).toDateTimeAtStartOfDay());
}
}
"avro.java.string": "String"Сразу кину ссылку на issue:https://issues.apache.org/jira/browse/AVRO-2702Если грубо, то в самой схеме, допустим, нет такой строчки. Но maven-плагин пишет в схему внутри модели именно так:
{
"type": "record",
"name": "FromMaven",
"namespace": "com.trthhrts",
"fields": [
{
"name": "message",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
От этого стратегия наименования значений сабжектов по названию модели - RecordNameStrategy - отрабатывает некорректно, и мы получаем ошибку при десериализации, как будто схема не найдена.Workaround: добавить "avro.java.string": "String" для каждого string-поля в моделях (тогда будет совпадение с тем, как схему опишет maven-плагин).Решение совсем неэлегантное, если есть варианты изящнее - с удовольствием обсудим в комментариях.ВыводыВ итоге, мы внедрили Schema Registry, и успешно его используем. Инструмент очень полезный, но есть определенные подводные грабли, которые мы успешно протестировали и отложили в сторону еще на этапе разработки.
===========
Источник:
habr.com
===========
Похожие новости:
- [Информационная безопасность, JavaScript] JavaScript prototype pollution: практика поиска и эксплуатации
- [Системное администрирование, Программирование, IT-инфраструктура, Apache] 5 вещей, о которых должен знать любой разработчик Apache Kafka (перевод)
- [Программирование, Java, Kotlin, Интервью] Большой разговор с новым Kotlin Project Lead Романом Елизаровым
- [IT-инфраструктура, Хранение данных, Хранилища данных, Сетевое оборудование] Тестируем СХД OceanStor Dorado V6: производительность и отказоустойчивость
- [Управление проектами, Карьера в IT-индустрии] Как я ходил на удалённые собеседования JAVA-разработчика, чтобы лучше нанимать людей
- [Программирование, Java, Разработка мобильных приложений, Разработка под Android] Как написать простое Android ToDo-приложение на Java
- [JavaScript] Контролируем JavaScript импорты с помощью Import maps
- [JavaScript, Программирование, VueJS] Сделаем худший Vue.js в мире (перевод)
- [Java] Пишем телеграм бота на Java от А до Я
- [Разработка веб-сайтов, Проектирование и рефакторинг, Терминология IT, Хранение данных] Символы Unicode: о чём должен знать каждый разработчик (перевод)
Теги для поиска: #_java, #_hranenie_dannyh (Хранение данных), #_kafka, #_avro, #_confluent, #_blog_kompanii_alfastrahovanie (
Блог компании АльфаСтрахование
), #_java, #_hranenie_dannyh (
Хранение данных
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 21:33
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем привет.В статье я опишу, как мы настраивали реестр схем данных для того, чтобы использовать его для сериализации и десериализации сообщений Kafka.Спойлер: на данный момент реестр схем данных настроен и используется в боевой системе, каких-то проблем, связанных с SR, замечено не было.Исходные данные (небольшое оправдание)Разработчикам не всегда выпадает возможность выбирать от и до инструменты, которые следует использовать при реализации той или иной задачи. Например, если вы приходите в команду, где уже начали реализовывать что-то на чем-то, то полностью изменить направление может быть неподъемной задачей. Таким образом, примем за данность, что используется Kafka для асинхронного взаимодействия сервисов - передачи сообщений по событиям.Что? Почему? Зачем?А вот то, что мы выбрали осознанно, так это использование Apache Avro как систему сериализации данных. Почему мы выбрали Apache Avro?Если кратко, то это:
{
"type": "record", "name": "SomeMessage", "namespace": "com.trthhrts", "fields": [ { "name": "title", "type": "string" } ] } {
"type": "record", "name": "AnotherMessage", "namespace": "com.trthhrts", "fields": [ { "name": "messageDate", "type": [ "null", { "type": "long", "logicalType": "timestamp-millis" } ], "default": null } ] } <plugin>
<groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-maven-plugin</artifactId> <version>6.0.1</version> <configuration> <schemaRegistryUrls> <param>${SCHEMA_REGISTRY_URL}</param> </schemaRegistryUrls> <outputDirectory>${project.build}/avro</outputDirectory> <userInfoConfig /> <subjects> <com.trthhrts.SomeMessage>src/SomeMessage.avsc</com.trthhrts.SomeMessage> </subjects> <schemaTypes> <com.trthhrts.SomeMessage>AVRO</com.trthhrts.SomeMessage> </schemaTypes> </configuration> </plugin>
register_schemas:
stage: deploy script: - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only: - master register_schemas:
stage: deploy before_script: - *scm_connect script: - mvn schema-registry:validate $MAVEN_CLI_OPTS -B - mvn schema-registry:test-compatibility $MAVEN_CLI_OPTS -B - mvn schema-registry:register $MAVEN_CLI_OPTS -B when: manual only: - master Настройка использования Schema RegistryЧтобы использовать SR, достаточно добавить свойство с URL подключения к SR и назначить в качестве сериализатора и десериализатора соответствующие классы: KafkaAvroSerializer и KafkaAvroDeserializer props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); @Bean
public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaConfigs(), new StringDeserializer(), deserializer())); factory.setErrorHandler(errorHandler()); return factory; } private KafkaAvroDeserializer deserializer() { KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(); deserializer.configure(kafkaConfigs(), false); return deserializer; }
props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
props.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
static {
SpecificData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()); } {
"type": "record", "name": "SomePaymentInfoModel", "namespace": "com.trthhrts", "fields": [ { "name": "paymentDate", "type": [ "null", { "type": "long", "logicalType": "timestamp-millis" } ], "default": null }, { "name": "amount", "type": [ "null", { "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 2 } ], "default": null } ] } convertDate(model.getBeginDate()).ifPresent(builder::setBeginDate)
convertLocalDateToJodaDateTime(model.getPaymentDate()).ifPresent(builder::setPaymentDate); convertToLocalDateTime(message.getLastUpdate()).ifPresent(model::setLastUpdate);
/**
* Класс с утилитными методам для конвертации типов */ public final class MessageUtils { /** Кол-во наносекунд в миллисекунде */ private static final int NANO_SECONDS_IN_MILLISECOND = 1_000_000; /** Кол-во миллисекунд в секунде */ private static final int MILLISECONDS_IN_SECONDS = 1000; /** * Запрещаем создание */ private MessageUtils() { } /** * Конвертирует Joda {@link DateTime} в Java 8 {@link ZonedDateTime} * @param dateTime дата в формате Joda {@link DateTime} * @return {@link ZonedDateTime} */ public static Optional<ZonedDateTime> convertToZonedDateTime(DateTime dateTime) { return Optional.ofNullable(dateTime).map(dt -> { DateTime localDateTime = dateTime.withZone(DateTimeZone.getDefault()); return ZonedDateTime.ofLocal( LocalDateTime.of( localDateTime.getYear(), localDateTime.getMonthOfYear(), localDateTime.getDayOfMonth(), localDateTime.getHourOfDay(), localDateTime.getMinuteOfHour(), localDateTime.getSecondOfMinute(), localDateTime.getMillisOfSecond() * NANO_SECONDS_IN_MILLISECOND), ZoneId.of(localDateTime.getZone().getID(), ZoneId.SHORT_IDS), ZoneOffset.ofTotalSeconds(localDateTime.getZone().getOffset(localDateTime) / MILLISECONDS_IN_SECONDS)); }); } /** * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDateTime} * @param dateTime дата в формате Joda {@link DateTime} * @return {@link LocalDateTime} */ public static Optional<LocalDateTime> convertToLocalDateTime(DateTime dateTime) { return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDateTime); } /** * Конвертирует Joda {@link DateTime} в Java 8 {@link LocalDate} * @param dateTime дата в формате Joda {@link DateTime} * @return {@link LocalDate} */ public static Optional<LocalDate> convertToLocalDate(DateTime dateTime) { return convertToZonedDateTime(dateTime).map(ZonedDateTime::toLocalDate); } /** * Конвертирует {@link LocalDate} в {@link org.joda.time.LocalDate} * @param ld {@link LocalDate} * @return {@link org.joda.time.LocalDate} */ public static Optional<org.joda.time.LocalDate> convertLocalDateToJodaLocalDate(LocalDate ld) { return Optional.ofNullable(ld).map(date -> new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth())); } /** * Конвертирует {@link org.joda.time.LocalDate} в {@link LocalDate} * @param ld {@link org.joda.time.LocalDate} * @return {@link LocalDate} */ public static Optional<LocalDate> convertJodaLocalDateToLocalDate(org.joda.time.LocalDate ld) { return Optional.ofNullable(ld).map(date -> LocalDate.of(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth())); } /** * Конвертирует {@link LocalDateTime} в Joda {@link DateTime} * @param ldt дата в формате {@link LocalDateTime} * @return дата в формате Joda {@link DateTime} */ public static Optional<DateTime> convertLocalDateTimeToJodaDateTime(LocalDateTime ldt) { return Optional.ofNullable(ldt) .map(date -> new org.joda.time.LocalDateTime(date.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).toDateTime()); } /** * Конвертирует {@link LocalDate} в Joda {@link DateTime} * @param ld дата в формате {@link LocalDate} * @return дата в формате Joda {@link DateTime} */ public static Optional<DateTime> convertLocalDateToJodaDateTime(LocalDate ld) { return Optional.ofNullable(ld) .map(date -> new org.joda.time.LocalDate(date.getYear(), date.getMonthValue(), date.getDayOfMonth()).toDateTimeAtStartOfDay()); } } {
"type": "record", "name": "FromMaven", "namespace": "com.trthhrts", "fields": [ { "name": "message", "type": { "type": "string", "avro.java.string": "String" } } ] } =========== Источник: habr.com =========== Похожие новости:
Блог компании АльфаСтрахование ), #_java, #_hranenie_dannyh ( Хранение данных ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 21:33
Часовой пояс: UTC + 5