[Big Data, Хранилища данных, Data Engineering] Мультитул для управления Хранилищем Данных — кейс Wheely + dbt
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Уже более двух лет data build tool активно используется в компании Wheely для управления Хранилищем Данных. За это время накоплен немалый опыт, мы на тернистом пути проб и ошибок к совершенству в Analytics Engineering.Несмотря на то, что в русскоязычном сегменте уже есть несколько публикаций, посвященных применению dbt, всё ещё нельзя говорить о широкой популярности и интересе, которые продукт стремительно обретает на Западе.Поэтому сегодня я предлагаю вам экскурсию по Хранилищу Данных Wheely. В формат публикации я попытался уложить самые яркие моменты и впечатления от использования dbt, снабдив реальными примерами, практиками и опытом. Добро пожаловать под кат.Структура превыше всего
Измерять сложность Хранилища Данных в количестве гигабайт сегодня - дурной тон
Налить кучу тяжело интерпретируемых данных без метаинформации (читай мусора) не составит большого труда. Гораздо сложнее из этих данных получить что-то осмысленное. То, на что с уверенностью могут опираться business stakeholders, принимая решения. То, что регулярно измеряется на предмет качества и актуальности. Наконец, то, что соответствует принципам Keep it simple (KISS) и Don’t repeat yourself (DRY).Первостепенным элементом я считаю прозрачность структуры Хранилища Данных. Чаще всего DWH выстраивается согласно многослойной логике, где каждому этапу соответствует набор преобразований, детали реализации которого скрыты для последующих слоев (элемент абстракции).
Схема слоев Хранилища ДанныхЗеленым цветом – слой источников данных sources. Это реплики структур и таблиц из исходных систем, которые поддерживаются ELT-сервисом. Данные синхронизируются 1:1 с источником, без каких-либо преобразований. Опциональный слой flatten позволяет вложенные иерархические структуры (JSON) превратить в плоские таблицы. Слой staging предназначен для простых преобразований: переименование полей, преобразование типов, расчет новых колонок с помощью конструкции case. На этом этапе мы готовим почву для дальнейших преобразований, приводим всё к единому виду и неймингу.Intermediate или промежуточный слой отвечает за формирование предварительных таблиц и агрегатов, где происходит обогащение данных. Для ряда бизнес-областей мы не используем этот слой, для других логика может насчитывать до 5-10 взаимосвязанных моделей. Кульминацией являются data marts или Витрины Данных, которые используются Data Scientists / Business Users / BI tools. Слой, в свою очередь, делится на:
- dimensions: пользователи, компании, машины, водители, календарь
- facts: поездки, транзакции, сеансы, продвижения, коммуникации
- looker: материализованные представления и витрины, оптимизированные под чтение из BI-системы
Число 120 из заголовка публикации относится только к витринам данных:
Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
На текущий момент в проекте:
- 273 модели во всех перечисленных слоях
- 493 теста на эти модели, включая not null, unique, foreign key, accepted values
- 6 снапшотов для ведения истории SCD (slowly changing dimensions)
- 532 макроса (большая часть из которых импортирована из сторонних модулей)
- 7 operations включая vacuum + analyze
- 81 источник данных
Помимо разбиения на логические слои, Хранилище можно нарезать по бизнес-областям. В случае необходимости есть возможность пересчитать или протестировать витрины, относящиеся к вертикалям Marketing / Supply / Growth / B2B. Например, в случае late arriving data или ручных корректировках маппингов/справочников.Осуществляется это за счет присвоения моделям и витринам тегов, а также за счет богатых возможностей синтаксиса выбора моделей. Запустить расчет всех витрин вертикали Marketing и их вышестоящие зависимости:
dbt run -m +tag:marketing
Этот же принцип лежит в основе организации кодой базы. Все скрипты объединены в директории с общей логикой и понятными наименованиями. Сложно потеряться даже при огромном количестве моделей и витрин:Иерархия проекта dbt
.
|____staging
| |____webhook
| |____receipt_prod
| |____core
| |____wheely_prod
| |____flights_prod
| |____online_hours_prod
| |____external
| |____financial_service
|____marts
| |____looker
| |____dim
| |____snapshots
| |____facts
|____flatten
| |____webhook
| |____receipt_prod
| |____wheely_prod
| |____communication_prod
|____audit
|____sources
|____aux
| |____dq
| | |____marts
| | |____external
|____intermediate
Оптимизация физической моделиЛогическое разделение на слои и области - это замечательно. Но не менее важно и то, как эта логика ложится на конкретную СУБД. В случае Wheely это Amazon Redshift.Подход с декомпозицией позволит разбить логику на понятные части, которые можно рефакторить по отдельности. Одновременно это помогает оптимизатору запросов подобрать лучший план выполнения. По такому принципу реализована одна из центральных витрин – journeys (поездки).
Цепочка зависимостей витрины поездок (journeys)На этапе обогащения данных важна скорость склейки таблиц (join performance), поэтому данные сегментированы и отсортированы в одинаковом ключе, начиная с sources. Это позволит использовать самый быстрый вид соединения - sort merge join:Конфигурация для оптимального соединения – sort merge join
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort="request_id"
)
}}
Витрина же хранится отсортированной по самым популярным колонкам доступа: city, country, completed timestamp, service group. В случае правильного подбора колонок Interleaved key позволяет значительно оптимизировать I/O и ускорить отрисовку графиков в BI-системах.Конфигурация для быстрого чтения витрины – interleaved sortkey
{{
config(
materialized='table',
unique_key='request_id',
dist="request_id",
sort_type='interleaved',
sort=["completed_ts_loc"
, "city"
, "country"
, "service_group"
, "is_airport"
, "is_wheely_journey"]
)
}}
При этом часть моделей есть смысл материализовать в виде views (виртуальных таблиц), не занимающих дисковое пространство в СУБД. Так, слой staging, не содержащий сложных преобразований, конфигурируется на создание в виде представлений на уровне проекта:
staging:
+materialized: view
+schema: staging
+tags: ["staging"]
Другой интересный пример – результаты проверки качества данных. Выбранный тип материализации – ephemeral, т.е. на уровне СУБД не будет создано ни таблицы, ни представления. При каждом обращении к такой модели будет выполнен лишь запрос. Результат такого запроса является слагаемым в суммарной таблице, содержащей метрики всех проверяемых объектов.В свою очередь большие таблицы фактов имеет смысл наполнять инкрементально. Особенно при условии того, что факт, случившийся однажды, больше не меняет своих характеристик. Таким образом мы процессим только изменения (delta) – новые факты, произошедшие после последнего обновления витрины. Обратите внимание на условие where:Пример инкрементального наполнения витрины
{{
config(
materialized='incremental',
sort='metadata_timestamp',
dist='fine_id',
unique_key='id'
)
}}
with fines as (
select
fine_id
, city_id
, amount
, details
, metadata_timestamp
, created_ts_utc
, updated_ts_utc
, created_dt_utc
from {{ ref('stg_fines') }}
where true
-- filter fines arrived since last processed time
{% if is_incremental() -%}
and metadata_timestamp > (select max(metadata_timestamp) from {{ this }})
{%- endif %}
),
...
Кстати, о принципах MPP и о том, как выжать максимум из аналитических СУБД я рассказываю на курсах Data Engineer и Data Warehouse Analyst (скоро первый запуск!).SQL + Jinja = FlexibilityВысокоуровневый декларативный язык SQL прекрасен сам по себе, но вкупе с движком шаблонизации Jinja он способен творить чудеса.Любой код, который вы используете с dbt проходит этапы compile & run. На этапе компиляции интерпретируются все шаблонизированные выражения и переменные. На этапе запуска код оборачивается в конструкцию CREATE в зависимости от выбранного типа материализации и фишек используемой СУБД: clustered by / distributed by / sorted by. Рассмотрим пример:Model code:
{{
config(
materialized='table',
dist="fine_id",
sort="created_ts_utc"
)
}}
with details as (
select
{{
dbt_utils.star(from=ref('fine_details_flatten'),
except=["fine_amount", "metadata_timestamp", "generated_number"]
)
}}
from {{ ref('fine_details_flatten') }}
where fine_amount > 0
)
select * from details
Compiled code:
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
Run code:
create table
"wheely"."dbt_test_akozyr"."f_chauffeurs_fines"
diststyle key distkey (fine_id)
compound sortkey(created_ts_utc)
as (
with details as (
select
"id",
"fine_id",
"city_id",
"amount",
"description",
"created_ts_utc",
"updated_ts_utc",
"created_dt_utc"
from "wheely"."dbt_test_akozyr"."fine_details_flatten"
where fine_amount > 0
)
select * from details
);
Ключевым моментом является тот факт, что пишете вы только лаконичный шаблонизированный код, а остальным занимается движок dbt. Написание boilerplate code сведено к минимуму. Фокус инженера или аналитика остается преимущественно на реализуемой логике.Во-вторых, как происходит выстраивание цепочки связей и очередности создания витрин, продемонстрированные на картинках выше? Внимательный читатель уже заметил, что в рамках написания кода при ссылках на другие модели нет хардкода, но есть конструкция {{ ref('fine_details_flatten') }}– ссылка на наименование другой модели. Она и позволяет распарсить весь проект и построить граф связей и зависимостей. Так что это тоже делается абсолютно прозрачным и органичным способом.С помощью шаблонизации Jinja в проекте Wheely мы гибко управляем схемами данных и разделением сред dev / test / prod. В зависимости от метаданных подключения к СУБД будет выбрана схема и период исторических данных. Продакшн модели создаются в целевых схемах под технической учетной записью. Аналитики же ведут разработку каждый в своей личной песочнице, ограниченной объемом данных в 3-е последних суток. Это реализуется с помощью макроса:Макрос управления схемами для подключений:
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if target.name == 'prod' and custom_schema_name is not none -%}
{{ custom_schema_name | trim }}
{%- else -%}
{{ default_schema }}
{%- endif -%}
{%- endmacro %}
Еще одним важным преимуществом является самодокументируемый код. Иными словами, из репозитория проекта автоматически можно собрать статический сайт с документацией: перечень слоев, моделей, атрибутный состав, метаинформацию о таблицах в СУБД и даже визуализировать граф зависимостей (да-да, картинки выше именно оттуда).
Не повторяйся – лучше подготовь макросОднотипный код, повторяющиеся обращения и действия, зачастую реализуемые по принципу copy-paste нередко являются причиной ошибок и багов. В Wheely мы придерживаемся принципа Do not repeat yourself и любой сколько-нибудь похожий код шаблонизируем в макрос с параметрами. Писать и поддерживать такой код становится сплошным удовольствием. Простой пример с конвертацией валют:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
, ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
, ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
, ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
, ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
Вызов макроса из модели:
select
...
-- price_details
, r.currency
, {{ convert_currency('price', 'currency') }}
, {{ convert_currency('transfer_min_price', 'currency') }}
, {{ convert_currency('discount', 'currency') }}
, {{ convert_currency('insurance', 'currency') }}
, {{ convert_currency('tips', 'currency') }}
, {{ convert_currency('parking', 'currency') }}
, {{ convert_currency('toll_road', 'currency') }}
, {{ convert_currency('pickup_charge', 'currency') }}
, {{ convert_currency('cancel_fee', 'currency') }}
, {{ convert_currency('net_bookings', 'currency') }}
, {{ convert_currency('gross_revenue', 'currency') }}
, {{ convert_currency('service_charge', 'currency') }}
...
from {{ ref('requests_joined') }} r
По большому счету, макрос это просто вызов функции с передачей аргументов, на уже знакомом вам диалекте Jinja. Результатом работы макроса является готовый к исполнению SQL-скрипт. Макрос для кросс-сверки значений в колонках:Сравнить значения двух колонок
-- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%}
{%- if is_numeric == true -%}
{%- set src_column = 'round(' + src_column + ', 2)' -%}
{%- set trg_column = 'round(' + trg_column + ', 2)' -%}
{%- endif -%}
CASE
WHEN {{ src_column }} = {{ trg_column }} THEN 'match'
WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null'
WHEN {{ src_column }} IS NULL THEN 'missing in source'
WHEN {{ trg_column }} IS NULL THEN 'missing in target'
WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch'
ELSE 'unknown'
END
{%- endmacro %}
В макрос можно запросто записать даже создание UDF-функций:Создать UDF
-- cast epoch as human-readable timestamp
{% macro create_udf() -%}
{% set sql %}
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT)
RETURNS VARCHAR(512)
STABLE
AS $$
# Convert column to binary, strip "0b" prefix, pad out with zeroes
if bitwise_column is not None:
b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1]
return b
else:
None
$$ LANGUAGE plpythonu
;
CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP)
RETURNS VARCHAR(128)
STABLE
AS $$
SELECT nvl(
DECODE($2, null, null, 'deleted')
, DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown')
, DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null)
, DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null)
)
$$ LANGUAGE SQL
;
{% endset %}
{% set table = run_query(sql) %}
{%- endmacro %}
Параметризовать можно и довольно сложные вещи, такие как работа с nested structures (иерархическими структурами) и выгрузка во внешние таблицы (external tables) в S3 в формате parquet. Эти примеры вполне достойны отдельных публикаций.Не изобретай велосипед – импортируй модулиМодуль или package - это набор макросов, моделей, тестов, который можно импортировать в свой проект в виде готовой к использованию библиотеки. На портале dbt hub есть неплохая подборка модулей на любой вкус, и, что самое главное, их список постоянно пополняется.
С помощью модуля логирования и добавления 2 простых hooks на каждый запуск dbt у меня как на ладони появляется статистическая информация о времени, продолжительности, флагах и параметрах развертывания. Я наглядно вижу модели анти-лидеры по потребляемым ресурсам (первые кандидаты на рефакторинг):
models:
+pre-hook: "{{ logging.log_model_start_event() }}"
+post-hook: "{{ logging.log_model_end_event() }}"
Мониторинг развертывания dbt моделей на кластере RedshiftИзмерение календаря собирается в одну строку, при этом набор колонок поражает:
{{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
Измерение календарь, сгенерированное макросомС помощью модуля dbt_external_tables я уже выстраиваю полноценный Lakehouse, обращаясь из Хранилища к данным, расположенным в файловом хранилище S3. К примеру, самые свежие курсы валют, получаемые через API Open Exchange Rates в формате JSON:External data stored in S3 accessed vith Redshift Spectrum
- name: external
schema: spectrum
tags: ["spectrum"]
description: "External data stored in S3 accessed vith Redshift Spectrum"
tables:
- name: currencies_oxr
description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
freshness:
error_after: {count: 15, period: hour}
loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
external:
location: "s3://data-analytics.wheely.com/dwh/currencies/"
row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
columns:
- name: timestamp
data_type: bigint
- name: base
data_type: varchar(3)
- name: rates
data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
Ну и, конечно, ночью по расписанию работает VACUUM + ANALYZE, ведь Redshift это форк PostgreSQL. Дефрагментация, сортировка данных в таблицах, сбор статистик. Иначе говоря поддержание кластера в тонусе, пока dba спит.
dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
VACUUM + ANALYZERunning in production: используем dbt Cloud в Wheelydbt Cloud это платный сервис для управления проектами, основанными на движке dbt. За небольшие деньги команда получает возможность создавать окружения, конфигурировать джобы и таски, устанавливать расписание запусков, и даже полноценную IDE (среду разработки!) в браузере.Прежде всего речь идет об удобстве использования: приятный и понятный визуальный интерфейс, простота поиска и ориентирования, акцентирование ключевой информации при разборе ошибок и чтении логов:
Во-вторых, это гибкие настройки условий запуска джобов. Начиная от простых условий с выбором дня недели и времени, продолжая кастомными cron-выражениями, и заканчивая триггером запуска через webhook. Например, именно через вебхук мы связываем в цепочку завершение выгрузок для кросс-сверки и начало расчета соответствующих витрин в Хранилище (kicked off from Airflow):
В третьих, это консолидация всех важных уведомлений одном месте. Для нашей команды это канал в Slack и любые проблемы связанные с Production-запусками. В режиме реального времени мы получаем все уведомления об инцидентах с деталями и ссылками на подробный лог.
Сам dbt является проектом с открытым исходным кодом, и использование продукта dbt Cloud представляется очень удобным, но не обязательным. В качестве альтернативных способов можно выбрать любой другой оркестратор: Airflow, Prefect, Dagster, и даже просто cron. В своем проекте Сквозная Аналитика я организую оркестрацию при помощи Github Actions. Выходит очень занятно.Вместо заключенияВ команде аналитики Wheely мы стремимся к тому, чтобы работа была наполнена смыслом и приносила удовлетворение и пользу, но не раздражение и негодование. Все перечисленные в публикации достоинства не могут не вызвать симпатию со стороны новых членов команды и значительно ускоряют процессы адаптации и onboarding.Сегодня бизнес и команда активно растут. Доступен ряд интересных позиций:
- Head of Data Insights - https://wheely.com/ru/careers/4425384003
- Product Analyst, Backoffice - https://wheely.com/ru/careers/4308521003
- Product Analyst, Business - https://wheely.com/ru/careers/4425290003
- Product Analyst, Chauffeur growth - https://wheely.com/ru/careers/4185132003
- Product Analyst, Marketplace - https://wheely.com/ru/careers/4425328003
- Product Analyst, Passenger growth - https://wheely.com/ru/careers/4194291003
У тебя есть возможность узнать детали из первых уст и получить прямую рекомендацию.Также время от времени я провожу вебинары и выступления, на которых подробнее рассказываю о своей работе и проектах. Следить за моими публикациями можно в телеграм-канале Technology Enthusiast – https://t.me/enthusiastechПишите, задавайте вопросы и, конечно, пробуйте dbt в своих проектах!
===========
Источник:
habr.com
===========
Похожие новости:
- [Хранение данных, Хранилища данных, Законодательство в IT, Видеотехника] Слежка за подчиненными и закон
- [Облачные вычисления, Разработка под e-commerce, Управление e-commerce, Облачные сервисы] Вебинар «Технологии, которые позволяют E-commerce опередить конкурентов» 6 апреля
- [Big Data] 7 распространенных ошибок, которые нужно проверить при отладке DAG-файлов Airflow (перевод)
- [Python, HTML, Big Data, Визуализация данных, Веб-аналитика] Аналитика алкогольной продукции сети магазинов «Лента»
- [Big Data, Accessibility, Машинное обучение, Искусственный интеллект] Дискриминация в алгоритмах ML существует — и нет, это не либеральные сказки
- [Open source, Программирование, Геоинформационные сервисы, Визуализация данных, Научно-популярное] Google Earth Engine (GEE) как общедоступный каталог больших геоданных
- [Разработка мобильных приложений, Git, Big Data, Машинное обучение] DVC — Git для данных на примере ML-проекта
- [Разработка мобильных приложений, Разработка под Android, Kotlin] Android + Redux = <3
- [Информационная безопасность, Big Data] ДИТ Москвы обновит платформу анализа поведения интернет-пользователей
- [Apache, Big Data, Data Engineering] Умные погодные приложения с Flink SQL (перевод)
Теги для поиска: #_big_data, #_hranilischa_dannyh (Хранилища данных), #_data_engineering, #_dbt, #_dwh, #_hranilische_dannyh (хранилище данных), #_data_build_tool, #_blog_kompanii_wheely (
Блог компании Wheely
), #_big_data, #_hranilischa_dannyh (
Хранилища данных
), #_data_engineering
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:37
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Уже более двух лет data build tool активно используется в компании Wheely для управления Хранилищем Данных. За это время накоплен немалый опыт, мы на тернистом пути проб и ошибок к совершенству в Analytics Engineering.Несмотря на то, что в русскоязычном сегменте уже есть несколько публикаций, посвященных применению dbt, всё ещё нельзя говорить о широкой популярности и интересе, которые продукт стремительно обретает на Западе.Поэтому сегодня я предлагаю вам экскурсию по Хранилищу Данных Wheely. В формат публикации я попытался уложить самые яркие моменты и впечатления от использования dbt, снабдив реальными примерами, практиками и опытом. Добро пожаловать под кат.Структура превыше всего Измерять сложность Хранилища Данных в количестве гигабайт сегодня - дурной тон
Схема слоев Хранилища ДанныхЗеленым цветом – слой источников данных sources. Это реплики структур и таблиц из исходных систем, которые поддерживаются ELT-сервисом. Данные синхронизируются 1:1 с источником, без каких-либо преобразований. Опциональный слой flatten позволяет вложенные иерархические структуры (JSON) превратить в плоские таблицы. Слой staging предназначен для простых преобразований: переименование полей, преобразование типов, расчет новых колонок с помощью конструкции case. На этом этапе мы готовим почву для дальнейших преобразований, приводим всё к единому виду и неймингу.Intermediate или промежуточный слой отвечает за формирование предварительных таблиц и агрегатов, где происходит обогащение данных. Для ряда бизнес-областей мы не используем этот слой, для других логика может насчитывать до 5-10 взаимосвязанных моделей. Кульминацией являются data marts или Витрины Данных, которые используются Data Scientists / Business Users / BI tools. Слой, в свою очередь, делится на:
Running with dbt=0.19.0
Found 273 models, 493 tests, 6 snapshots, 4 analyses, 532 macros, 7 operations, 8 seed files, 81 sources, 0 exposures
dbt run -m +tag:marketing
.
|____staging | |____webhook | |____receipt_prod | |____core | |____wheely_prod | |____flights_prod | |____online_hours_prod | |____external | |____financial_service |____marts | |____looker | |____dim | |____snapshots | |____facts |____flatten | |____webhook | |____receipt_prod | |____wheely_prod | |____communication_prod |____audit |____sources |____aux | |____dq | | |____marts | | |____external |____intermediate Цепочка зависимостей витрины поездок (journeys)На этапе обогащения данных важна скорость склейки таблиц (join performance), поэтому данные сегментированы и отсортированы в одинаковом ключе, начиная с sources. Это позволит использовать самый быстрый вид соединения - sort merge join:Конфигурация для оптимального соединения – sort merge join {{
config( materialized='table', unique_key='request_id', dist="request_id", sort="request_id" ) }} {{
config( materialized='table', unique_key='request_id', dist="request_id", sort_type='interleaved', sort=["completed_ts_loc" , "city" , "country" , "service_group" , "is_airport" , "is_wheely_journey"] ) }} staging:
+materialized: view +schema: staging +tags: ["staging"] {{
config( materialized='incremental', sort='metadata_timestamp', dist='fine_id', unique_key='id' ) }} with fines as ( select fine_id , city_id , amount , details , metadata_timestamp , created_ts_utc , updated_ts_utc , created_dt_utc from {{ ref('stg_fines') }} where true -- filter fines arrived since last processed time {% if is_incremental() -%} and metadata_timestamp > (select max(metadata_timestamp) from {{ this }}) {%- endif %} ), ... {{
config( materialized='table', dist="fine_id", sort="created_ts_utc" ) }} with details as ( select {{ dbt_utils.star(from=ref('fine_details_flatten'), except=["fine_amount", "metadata_timestamp", "generated_number"] ) }} from {{ ref('fine_details_flatten') }} where fine_amount > 0 ) select * from details with details as (
select "id", "fine_id", "city_id", "amount", "description", "created_ts_utc", "updated_ts_utc", "created_dt_utc" from "wheely"."dbt_test_akozyr"."fine_details_flatten" where fine_amount > 0 ) select * from details create table
"wheely"."dbt_test_akozyr"."f_chauffeurs_fines" diststyle key distkey (fine_id) compound sortkey(created_ts_utc) as ( with details as ( select "id", "fine_id", "city_id", "amount", "description", "created_ts_utc", "updated_ts_utc", "created_dt_utc" from "wheely"."dbt_test_akozyr"."fine_details_flatten" where fine_amount > 0 ) select * from details ); {% macro generate_schema_name_for_env(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%} {%- if target.name == 'prod' and custom_schema_name is not none -%} {{ custom_schema_name | trim }} {%- else -%} {{ default_schema }} {%- endif -%} {%- endmacro %} Не повторяйся – лучше подготовь макросОднотипный код, повторяющиеся обращения и действия, зачастую реализуемые по принципу copy-paste нередко являются причиной ошибок и багов. В Wheely мы придерживаемся принципа Do not repeat yourself и любой сколько-нибудь похожий код шаблонизируем в макрос с параметрами. Писать и поддерживать такой код становится сплошным удовольствием. Простой пример с конвертацией валют: -- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%} ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd {%- endmacro %} select
... -- price_details , r.currency , {{ convert_currency('price', 'currency') }} , {{ convert_currency('transfer_min_price', 'currency') }} , {{ convert_currency('discount', 'currency') }} , {{ convert_currency('insurance', 'currency') }} , {{ convert_currency('tips', 'currency') }} , {{ convert_currency('parking', 'currency') }} , {{ convert_currency('toll_road', 'currency') }} , {{ convert_currency('pickup_charge', 'currency') }} , {{ convert_currency('cancel_fee', 'currency') }} , {{ convert_currency('net_bookings', 'currency') }} , {{ convert_currency('gross_revenue', 'currency') }} , {{ convert_currency('service_charge', 'currency') }} ... from {{ ref('requests_joined') }} r -- compare two columns
{% macro dq_compare_columns(src_column, trg_column, is_numeric=false) -%} {%- if is_numeric == true -%} {%- set src_column = 'round(' + src_column + ', 2)' -%} {%- set trg_column = 'round(' + trg_column + ', 2)' -%} {%- endif -%} CASE WHEN {{ src_column }} = {{ trg_column }} THEN 'match' WHEN {{ src_column }} IS NULL AND {{ trg_column }} IS NULL THEN 'both null' WHEN {{ src_column }} IS NULL THEN 'missing in source' WHEN {{ trg_column }} IS NULL THEN 'missing in target' WHEN {{ src_column }} <> {{ trg_column }} THEN 'mismatch' ELSE 'unknown' END {%- endmacro %} -- cast epoch as human-readable timestamp
{% macro create_udf() -%} {% set sql %} CREATE OR REPLACE FUNCTION {{ target.schema }}.f_bitwise_to_delimited(bitwise_column BIGINT, bits_in_column INT) RETURNS VARCHAR(512) STABLE AS $$ # Convert column to binary, strip "0b" prefix, pad out with zeroes if bitwise_column is not None: b = bin(bitwise_column)[2:].zfill(bits_in_column)[:bits_in_column+1] return b else: None $$ LANGUAGE plpythonu ; CREATE OR REPLACE FUNCTION {{ target.schema }}.f_decode_access_flags(access_flags INT, deleted_at TIMESTAMP) RETURNS VARCHAR(128) STABLE AS $$ SELECT nvl( DECODE($2, null, null, 'deleted') , DECODE(LEN(analytics.f_bitwise_to_delimited($1, 7))::INT, 7, null, 'unknown') , DECODE(analytics.f_bitwise_to_delimited($1, 7)::INT, 0, 'active', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 1, 1), 1, 'end_of_life', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 7, 1), 1, 'pending', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 6, 1), 1, 'rejected', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 5, 1), 1, 'blocked', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 4, 1), 1, 'expired_docs', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 3, 1), 1, 'partner_blocked', null) , DECODE(SUBSTRING(analytics.f_bitwise_to_delimited($1, 7), 2, 1), 1, 'new_partner', null) ) $$ LANGUAGE SQL ; {% endset %} {% set table = run_query(sql) %} {%- endmacro %} С помощью модуля логирования и добавления 2 простых hooks на каждый запуск dbt у меня как на ладони появляется статистическая информация о времени, продолжительности, флагах и параметрах развертывания. Я наглядно вижу модели анти-лидеры по потребляемым ресурсам (первые кандидаты на рефакторинг): models:
+pre-hook: "{{ logging.log_model_start_event() }}" +post-hook: "{{ logging.log_model_end_event() }}" Мониторинг развертывания dbt моделей на кластере RedshiftИзмерение календаря собирается в одну строку, при этом набор колонок поражает: {{ dbt_date.get_date_dimension('2012-01-01', '2025-12-31') }}
Измерение календарь, сгенерированное макросомС помощью модуля dbt_external_tables я уже выстраиваю полноценный Lakehouse, обращаясь из Хранилища к данным, расположенным в файловом хранилище S3. К примеру, самые свежие курсы валют, получаемые через API Open Exchange Rates в формате JSON:External data stored in S3 accessed vith Redshift Spectrum - name: external
schema: spectrum tags: ["spectrum"] description: "External data stored in S3 accessed vith Redshift Spectrum" tables: - name: currencies_oxr description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org" freshness: error_after: {count: 15, period: hour} loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second' external: location: "s3://data-analytics.wheely.com/dwh/currencies/" row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'" columns: - name: timestamp data_type: bigint - name: base data_type: varchar(3) - name: rates data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8> dbt run-operation redshift_maintenance --args '{include_schemas: ["staging", "flatten", "intermediate", "analytics", "meta", "snapshots", "ad_hoc"]}'
VACUUM + ANALYZERunning in production: используем dbt Cloud в Wheelydbt Cloud это платный сервис для управления проектами, основанными на движке dbt. За небольшие деньги команда получает возможность создавать окружения, конфигурировать джобы и таски, устанавливать расписание запусков, и даже полноценную IDE (среду разработки!) в браузере.Прежде всего речь идет об удобстве использования: приятный и понятный визуальный интерфейс, простота поиска и ориентирования, акцентирование ключевой информации при разборе ошибок и чтении логов: Во-вторых, это гибкие настройки условий запуска джобов. Начиная от простых условий с выбором дня недели и времени, продолжая кастомными cron-выражениями, и заканчивая триггером запуска через webhook. Например, именно через вебхук мы связываем в цепочку завершение выгрузок для кросс-сверки и начало расчета соответствующих витрин в Хранилище (kicked off from Airflow): В третьих, это консолидация всех важных уведомлений одном месте. Для нашей команды это канал в Slack и любые проблемы связанные с Production-запусками. В режиме реального времени мы получаем все уведомления об инцидентах с деталями и ссылками на подробный лог. Сам dbt является проектом с открытым исходным кодом, и использование продукта dbt Cloud представляется очень удобным, но не обязательным. В качестве альтернативных способов можно выбрать любой другой оркестратор: Airflow, Prefect, Dagster, и даже просто cron. В своем проекте Сквозная Аналитика я организую оркестрацию при помощи Github Actions. Выходит очень занятно.Вместо заключенияВ команде аналитики Wheely мы стремимся к тому, чтобы работа была наполнена смыслом и приносила удовлетворение и пользу, но не раздражение и негодование. Все перечисленные в публикации достоинства не могут не вызвать симпатию со стороны новых членов команды и значительно ускоряют процессы адаптации и onboarding.Сегодня бизнес и команда активно растут. Доступен ряд интересных позиций:
=========== Источник: habr.com =========== Похожие новости:
Блог компании Wheely ), #_big_data, #_hranilischa_dannyh ( Хранилища данных ), #_data_engineering |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 13:37
Часовой пояс: UTC + 5