[Ruby, Ruby on Rails, Администрирование баз данных, Микросервисы] Синхронизация баз данных между монолитом и микросервисами с помощью Kafka. Наше решение
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
В этой статье я расскажу про готовое решение для поддержки консистентности данных между растущей микросервисной и унаследованной архитектурой. Под катом код для репликации двух баз данных с проверкой синхронизации, который может пригодиться для решения аналогичных задач.
С проблемой консистентности данных мы столкнулись при разработке микросервиса под названием Profile. Он отвечает за регистрацию новых пользователей, хранение данных о них и синхронизируется с монолитной базой. Именно в синхронизации двух баз оказалось несколько проблем.
Синхронизация данных
Чтобы система работала стабильно, когда данные меняются в одной базе данных, изменения должны автоматически отразиться на другой. Для этого мы создали очередь из таких изменений в Kafka.
Мы начали работу с таблицы студентов. К базовым колонкам с именами, фамилиями и классами добавили дополнительные — revision и foreign_revision — для работы с очередью. Теперь при добавлении или изменении данных вызывается триггер в постгресе, который записывает в поле revision текущее время с точностью до миллисекунды. Привожу код добавления колонок и триггера:
ALTER TABLE "students" ADD "revision" timestamp(6)
ALTER TABLE "students" ALTER COLUMN "revision" SET DEFAULT timezone('utc', now());
ALTER TABLE "students" ADD "foreign_revision" timestamp(6)
CREATE OR REPLACE FUNCTION increase_revision() RETURNS trigger AS $$
BEGIN
NEW.revision := timezone('utc', now());
RETURN NEW;
END
$$ LANGUAGE PLPGSQL;
CREATE TRIGGER update_revision
BEFORE UPDATE ON students
FOR EACH ROW
WHEN (old.foreign_revision is not distinct FROM new.foreign_revision and
row_to_json(old)::jsonb - 'revision' is distinct FROM row_to_json(new)::jsonb - 'revision')
EXECUTE PROCEDURE increase_revision();
После добавления студента или изменения его персональных данных формируем и отправляем сообщение в Kafka. Однако если отправить такие сообщения до закрытия транзакции, база пострадает: закончатся коннекты, из-за ошибки сети транзакция откатится. Чтобы этого не происходило, в модели мы использовали after_commit:
after_commit :push_to_exchange, on: [:create, :update]
Сервис Profile подписан на общую очередь в Kafka и либо обновляет существующую запись в таблице, либо добавляет новую.
class StudentConsumer
def consume(payload, metadata)
if record = Student.where(id: payload.id).first
record.update!(params(payload))
else
Student.create!(params(payload))
end
end
def params(payload)
hash = payload.to_h
hash[:foreign_revision] = hash[:revision]
hash.slice(*Student.column_names.map(&:to_sym))
end
end
Таким образом мы добились того, что данные консистентны в двух разных базах. Процесс состоит из четырех шагов:
- Добавляем или обновляем студента в монолите.
- Триггер проставляет текущее время в поле revision.
- Отправляем сообщение в Kafka.
- Получаем сообщение и сохраняем данные о студенте в базе сервиса Profile.
Этот алгоритм работает хорошо, пока не возникнут проблемы: упадет сеть, появятся массовые изменения через update_all или единичные — через update_column, Kafka не будет работать или будет работать медленно.
Чтобы все это не влияло на процесс синхронизации, монолит также подписан на эту очередь и записывает в поле foreign_revision ревизию, которую прочитал из Kafka:
class StudentConsumer
def consume(payload, metadata)
Student.where(id: payload.id).update_all(foreign_revision: payload.revision)
end
end
Каждые пять минут в монолите запускается воркер, который находит все строки, у которых поля ревизий не совпадают, и заново досылает их в Kafka:
module Profile::SyncShareable
def run
Student.where("foreign_revision is null or revision != foreign_revision").
where("revision < ?", Time.now - 1.minute).
order(revision: :desc).
limit(5000).
each(&:push_to_exchange)
end
end
Для ускорения этого процесса нужен условный индекс. Он будет маленького размера, потому что у большинства записей ревизии будут совпадать:
CREATE INDEX "index_studends_on_revision" ON "students" ("revision") WHERE revision <> foreign_revision
Таким образом актуальная информация о всех студентах стала доступна для чтения в сервисе Profile. Однако для изменения данных мы были вынуждены ходить в API монолита.
Чтобы вносить изменения прямо в Profile, мы задумались о двусторонней синхронизации.
Двухсторонняя синхронизация
Двустороннюю синхронизацию можно обеспечить, если зеркально повторить весь предыдущий код в сервисе Profile. Но придется решить несколько проблем.
1. Генерация уникального идентификатора
Мы не можем создать нового студента в Profile, если в монолите использован числовой ID. Решит проблему переход на строчный UUID вместо числового инкремента.
2. Синхронизация занимает существенное время
Проблема заключается в том, что данные могут обновляться в двух местах сразу. Например, если в 48 секунд произошло изменение имени в монолите, а в 49 секунд — фамилии в Profile. Теоретически это возможно при исправлениях, дополнениях, автоматической коррекции. Обмен сообщениями через Kafka может занимать дольше трех секунд, и в таком случае изменение имени потеряется из-за более новой версии данных с обновленной фамилией.
Чтобы при двусторонней синхронизации такого не происходило, можно заменить Kafka на что-то более быстрое, например, на RabbitMQ. Но в Kafka хранится журнал транзакций, и мы можем вернуться, проанализировать нашу синхронизацию, в случае аварии проработать транзакции заново. К тому же он умеет работать с двумя разными ЦОД. Для нас это было важно, и мы остались с Kafka. Хотя для кого-то, возможно, актуальнее будет быстрый Rabbit, в котором синхронизация происходит за миллисекунды, а количество воркеров можно регулировать динамически.
3. Асинхронная синхронизация
Когда мы пишем изменения в Profile, нет гарантии, что прочитаем их в монолите, — данные синхронизируются с задержкой. Это надо учитывать, когда разные части приложения написаны поверх разных сервисов. В таких местах приходится отказываться от двусторонней синхронизации и переходить на синхронное взаимодействие через REST API или gRPC.
Таким образом мы можем распределить нагрузку между монолитом и отдельным сервисом, улучшить кодовую базу, делать деплой этого сервиса независимо от монолита.
***
Как вы решали проблему консистентности данных в микросервисной архитектуре? Какой опыт бесшовного распиливания монолита у вас был?
===========
Источник:
habr.com
===========
Похожие новости:
- [MySQL, Серверная оптимизация, Администрирование баз данных] Читаем EXPLAIN на максималках
- [Программирование, Совершенный код, Управление разработкой] Код ревью: как быть хорошим автором
- [Программирование, Управление проектами, Управление персоналом, Карьера в IT-индустрии] Почему большинство программистов оказываются посредственными техлидами (перевод)
- [IT-инфраструктура, Управление продуктом, Микросервисы] Как быстро и легко интегрироваться с Active Directory
- [Системное администрирование, Программирование, IT-инфраструктура, Apache] Как Apache Kafka поддерживает 200К партиций в кластере? (перевод)
- [Системное администрирование, *nix, DevOps, Микросервисы, Kubernetes] Ломаем и чиним etcd-кластер
- [Программирование, Совершенный код] Trace, Info, Warning, Error, Fatal — кто все эти люди..?
- [Микросервисы] Микросервисы и безопасность (перевод)
- [Совершенный код, UML Design, ООП, Параллельное программирование] Как построить четкие модели классов и получить реальные преимущества от UML. Часть 4 (перевод)
- [Микросервисы] Обзор фреймворков для оркестрации микросервисов: Conductor, Zeebe, Temporal
Теги для поиска: #_ruby, #_ruby_on_rails, #_administrirovanie_baz_dannyh (Администрирование баз данных), #_mikroservisy (Микросервисы), #_kod (код), #_sinhronizatsija_baz_dannyh (синхронизация баз данных), #_kafka, #_mikroservisy (микросервисы), #_sinhronizatsija (синхронизация), #_ruby, #_ruby_on_rails, #_konsistentnost (консистентность), #_blog_kompanii_uchi.ru (
Блог компании Учи.ру
), #_ruby, #_ruby_on_rails, #_administrirovanie_baz_dannyh (
Администрирование баз данных
), #_mikroservisy (
Микросервисы
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:22
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
В этой статье я расскажу про готовое решение для поддержки консистентности данных между растущей микросервисной и унаследованной архитектурой. Под катом код для репликации двух баз данных с проверкой синхронизации, который может пригодиться для решения аналогичных задач. С проблемой консистентности данных мы столкнулись при разработке микросервиса под названием Profile. Он отвечает за регистрацию новых пользователей, хранение данных о них и синхронизируется с монолитной базой. Именно в синхронизации двух баз оказалось несколько проблем. Синхронизация данных Чтобы система работала стабильно, когда данные меняются в одной базе данных, изменения должны автоматически отразиться на другой. Для этого мы создали очередь из таких изменений в Kafka. Мы начали работу с таблицы студентов. К базовым колонкам с именами, фамилиями и классами добавили дополнительные — revision и foreign_revision — для работы с очередью. Теперь при добавлении или изменении данных вызывается триггер в постгресе, который записывает в поле revision текущее время с точностью до миллисекунды. Привожу код добавления колонок и триггера: ALTER TABLE "students" ADD "revision" timestamp(6)
ALTER TABLE "students" ALTER COLUMN "revision" SET DEFAULT timezone('utc', now()); ALTER TABLE "students" ADD "foreign_revision" timestamp(6) CREATE OR REPLACE FUNCTION increase_revision() RETURNS trigger AS $$ BEGIN NEW.revision := timezone('utc', now()); RETURN NEW; END $$ LANGUAGE PLPGSQL; CREATE TRIGGER update_revision BEFORE UPDATE ON students FOR EACH ROW WHEN (old.foreign_revision is not distinct FROM new.foreign_revision and row_to_json(old)::jsonb - 'revision' is distinct FROM row_to_json(new)::jsonb - 'revision') EXECUTE PROCEDURE increase_revision(); После добавления студента или изменения его персональных данных формируем и отправляем сообщение в Kafka. Однако если отправить такие сообщения до закрытия транзакции, база пострадает: закончатся коннекты, из-за ошибки сети транзакция откатится. Чтобы этого не происходило, в модели мы использовали after_commit: after_commit :push_to_exchange, on: [:create, :update]
Сервис Profile подписан на общую очередь в Kafka и либо обновляет существующую запись в таблице, либо добавляет новую. class StudentConsumer
def consume(payload, metadata) if record = Student.where(id: payload.id).first record.update!(params(payload)) else Student.create!(params(payload)) end end def params(payload) hash = payload.to_h hash[:foreign_revision] = hash[:revision] hash.slice(*Student.column_names.map(&:to_sym)) end end Таким образом мы добились того, что данные консистентны в двух разных базах. Процесс состоит из четырех шагов:
Этот алгоритм работает хорошо, пока не возникнут проблемы: упадет сеть, появятся массовые изменения через update_all или единичные — через update_column, Kafka не будет работать или будет работать медленно. Чтобы все это не влияло на процесс синхронизации, монолит также подписан на эту очередь и записывает в поле foreign_revision ревизию, которую прочитал из Kafka: class StudentConsumer
def consume(payload, metadata) Student.where(id: payload.id).update_all(foreign_revision: payload.revision) end end Каждые пять минут в монолите запускается воркер, который находит все строки, у которых поля ревизий не совпадают, и заново досылает их в Kafka: module Profile::SyncShareable
def run Student.where("foreign_revision is null or revision != foreign_revision"). where("revision < ?", Time.now - 1.minute). order(revision: :desc). limit(5000). each(&:push_to_exchange) end end Для ускорения этого процесса нужен условный индекс. Он будет маленького размера, потому что у большинства записей ревизии будут совпадать: CREATE INDEX "index_studends_on_revision" ON "students" ("revision") WHERE revision <> foreign_revision
Таким образом актуальная информация о всех студентах стала доступна для чтения в сервисе Profile. Однако для изменения данных мы были вынуждены ходить в API монолита. Чтобы вносить изменения прямо в Profile, мы задумались о двусторонней синхронизации. Двухсторонняя синхронизация Двустороннюю синхронизацию можно обеспечить, если зеркально повторить весь предыдущий код в сервисе Profile. Но придется решить несколько проблем. 1. Генерация уникального идентификатора Мы не можем создать нового студента в Profile, если в монолите использован числовой ID. Решит проблему переход на строчный UUID вместо числового инкремента. 2. Синхронизация занимает существенное время Проблема заключается в том, что данные могут обновляться в двух местах сразу. Например, если в 48 секунд произошло изменение имени в монолите, а в 49 секунд — фамилии в Profile. Теоретически это возможно при исправлениях, дополнениях, автоматической коррекции. Обмен сообщениями через Kafka может занимать дольше трех секунд, и в таком случае изменение имени потеряется из-за более новой версии данных с обновленной фамилией. Чтобы при двусторонней синхронизации такого не происходило, можно заменить Kafka на что-то более быстрое, например, на RabbitMQ. Но в Kafka хранится журнал транзакций, и мы можем вернуться, проанализировать нашу синхронизацию, в случае аварии проработать транзакции заново. К тому же он умеет работать с двумя разными ЦОД. Для нас это было важно, и мы остались с Kafka. Хотя для кого-то, возможно, актуальнее будет быстрый Rabbit, в котором синхронизация происходит за миллисекунды, а количество воркеров можно регулировать динамически. 3. Асинхронная синхронизация Когда мы пишем изменения в Profile, нет гарантии, что прочитаем их в монолите, — данные синхронизируются с задержкой. Это надо учитывать, когда разные части приложения написаны поверх разных сервисов. В таких местах приходится отказываться от двусторонней синхронизации и переходить на синхронное взаимодействие через REST API или gRPC. Таким образом мы можем распределить нагрузку между монолитом и отдельным сервисом, улучшить кодовую базу, делать деплой этого сервиса независимо от монолита. *** Как вы решали проблему консистентности данных в микросервисной архитектуре? Какой опыт бесшовного распиливания монолита у вас был? =========== Источник: habr.com =========== Похожие новости:
Блог компании Учи.ру ), #_ruby, #_ruby_on_rails, #_administrirovanie_baz_dannyh ( Администрирование баз данных ), #_mikroservisy ( Микросервисы ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:22
Часовой пояс: UTC + 5