[Big Data, Data Engineering] Курсы валют и аналитика – использование обменных курсов в Хранилище Данных

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
19-Май-2021 19:32


Привет! На связи Артемий – Analytics Engineer из Wheely.Сегодня хотел бы поговорить о вопросах конвертирования финансовых показателей в разные валюты. Вопрос достаточно актуальный, так как большое количество компаний имеют мультинациональные зоны присутствия, строят аналитику глобального масштаба, готовят отчетность по международным стандартам.Покажу как этот вопрос решается с помощью современных подходов на примере кейса Wheely:
  • Расширение списка базовых валют
  • Регулярное обновление и получения актуальных курсов
  • Обеспечение корректности исторических показателей
  • Максимальное удобство и простота использования в аналитических инструментах
Велком под кат для разбора решения проблемы учета мультивалютных метрик и показателей: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.Новые требования к сервису валютных курсовВ качестве legacy-источника использовался веб-сервис ЦБ РФ. Однако с изменяющимися требованиями и расширением зон присутствия компании его стало недостаточно. Например, по причине отсутствия котировки AED (дирхам ОАЭ). Для кого-то могут быть актуальны курсы криптовалют BTC, ETH, которые в веб-сервисе ЦБ РФ тоже отсутствуют.Новые требования можно суммировать следующим образом:
  • Поддержка расширенного набора базовых валют, которые отсутствуют в API ЦБ РФ
  • Получение самых актуальных котировок, включая внутридневные курсы
  • Минимизация трансформаций данных вне Хранилища Данных (лучше если их вообще нет)

Матрица новых требований к работе с курсами валютЗадачи, которые предстоит решить легко визуализировать в виде матрицы. Красным помечены области, поддержку которых предстоит добавить:
  • Интеграция нового API для уже использующихся курсов
  • Добавление новых базовых валют в выгрузку
  • Получение ретроспективных (исторических) данных по новым валютам за прошлые периоды
  • Архивирование курсов из legacy-источника
Легаси приложение по выгрузке курсов валют формировало pivot-таблицу с коэффициентом для каждой пары в отдельном столбце. Это удобно, когда у нас есть строго фиксированный набор валют и наименования колонок, но превращается в головную боль если список валют необходимо расширить. 
Появилось желание уйти от всех трансформаций и формирований таблиц в pandas до того как данные попадают в Хранилище. Здесь я придерживаюсь принципа применения всех трансформаций (T в ELT) в одном месте, и помогает мне в этом замечательный инструмент dbt.Интеграция с новым поставщиком данныхКак уже стало понятно, без внешнего поставщика данных обойтись не получится, поэтому предлагаю рассмотреть один из ряда провайдеров курсов валют – https://openexchangerates.org/
Минимальный необходимый план Developer включает в себя:
  • 10.000 запросов ежемесячно (более чем достаточно)
  • Ежечасные внутридневные обновления курсов
  • Широкий набор базовых валют, включая криптовалюты
Доступные методы API:
Для получения актуальных курсов валют воспользуемся API endpoint /latest.jsonПростой запрос-ответ может выглядеть следующим образом:
Установка на расписание в AirflowДля регулярного получения актуальных курсов валют я воспользуюсь инструментом Airflow. Apache Airflow – де-факто стандарт в области оркестрации данных, data engineering и управления пайплайнами. Смысловая составляющая графа задачи (DAG):
  • Сделать запрос к API
  • Сохранить полученный ответ (например, в виде уникального ключа на S3)
  • Уведомить в Slack в случае ошибки
Конфигурация DAG:
  • Базовые валюты (base currency), от которых отсчитываем курсы
  • Синхронизация расписание запусков с расчетом витрин в Хранилище Данных
  • Токен доступа к сервису
Самый простой DAG состоит из одного таска с вызовом простого shell-скрипта:
TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" \
"https://openexchangerates.org/api/historical/$BUSINESS_DT.json?base=$BASE_CURRENCY&symbols=$SYMBOLS" \
| aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$BUSINESS_DT-$BASE_CURRENCY-$TS.json
Вот как выглядит результат регулярной работы скрипта в S3:
Сегодня в штатном режиме выполняется около 25 обращений к сервису в сутки, статистика выглядит следующим образом:
Выгрузка истории по новым валютамПосле обеспечения регулярной выгрузки всех необходимых валют, можно приступить к формированию истории по новым базовым валютам (которой, очевидно, нет). Это позволит переводить в новые валюты суммы транзакций прошлых периодов.К сожалению, план Developer не включает обращения к API endpoint /time-series.json, и только ради этой разовой задачи не имеет смысла делать upgrade на более дорогостоящую версию.Воспользуемся методом /historical/*.json и простым опросом API в цикле для формирования исторической выгрузки:
#!/bin/bash
d=2011-01-01
while [ "$d" != 2021-02-19 ]; do
echo $d
curl -H "Authorization: Token $TOKEN" "https://openexchangerates.org/api/historical/$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)
done
Пиковая нагрузка вызвала вопросы у коллег, которые тоже пользуются сервисом, но это была разовая акция:
Архивирование исторических курсов валютВся история обменных курсов полученная из legacy-источника ЦБ РФ до даты X (перехода на новый сервис-провайдер) подлежит архивированию в неизменном виде.Я хочу сохранить все те курсы, которые мы показывали в своих аналитических инструментах без изменений. То есть чтобы суммы в дашбордах и отчетах бизнес-пользователей не были изменены ни на копейку.Для этого я выполню выгрузку накопленных значений обменных курсов за весь исторический период в Data Lake. Более детально, я произведу:
  • Трансформацию legacy pivot-таблицы в двумерную
  • Запись в колоночный формат PARQUET в AWS S3
Формирование архива в S3 в формате PARQUET
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
STORED AS PARQUET
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
   SELECT 'EUR' AS base_currency
   UNION ALL
   SELECT 'GBP'
   UNION ALL
   SELECT 'RUB'
   UNION ALL
   SELECT 'USD'
)
SELECT
   "day" AS business_dt
   ,b.base_currency
   ,CASE b.base_currency
       WHEN 'EUR' THEN 1
       WHEN 'GBP' THEN gbp_to_eur
       WHEN 'RUB' THEN rub_to_eur
       WHEN 'USD' THEN usd_to_eur
       ELSE NULL
     END AS eur
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_gbp
       WHEN 'GBP' THEN 1
       WHEN 'RUB' THEN rub_to_gbp
       WHEN 'USD' THEN usd_to_gbp
       ELSE NULL
     END AS gbp
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_rub
       WHEN 'GBP' THEN gbp_to_rub
       WHEN 'RUB' THEN 1
       WHEN 'USD' THEN usd_to_rub
       ELSE NULL
     END AS rub
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_usd
       WHEN 'GBP' THEN gbp_to_usd
       WHEN 'RUB' THEN rub_to_usd
       WHEN 'USD' THEN 1
       ELSE NULL
     END AS usd
FROM ext.currencies c
   CROSS JOIN base b
;
Таким образом, в хранилище S3 у меня теперь есть статический снимок всех обменных курсов, когда-либо использованных в аналитических приложениях, сериализованный в оптимизированный колоночный формат со сжатием. В случае необходимости пересчета витрин и исторических данных я запросто смогу воспользоваться этими курсами.Доступ к данным из DWH через S3 External TableА теперь самое интересное – из своего аналитического движка Amazon Redshift я хочу иметь возможность просто и быстро обращаться к самым актуальным курсам валют, использовать их в своих трансформациях.Оптимальное решение – создание внешних таблиц EXTERNAL TABLE, которые обеспечивают SQL-доступ к данным, хранящимся в S3. При этом нам доступно чтение полуструктурированных данных в формате JSON, бинарных данных в форматах AVRO, ORC, PARQUET и другие опции. Продукт имеет название Redshift Spectrum и тесно связан с SQL-движком Amazon Athena, который имеет много общего с Presto.
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
   "timestamp" bigint
   , base varchar(3)
   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
)
ROW format serde 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<BUCKET>/dwh/currencies/'
;
Обратите внимание на обращение ко вложенному документу rates с помощью создания типа данных struct.Теперь добавим к этой задаче секретную силу dbt. Модуль dbt-external-tables позволяет автоматизировать создание EXTERNAL TABLES и зарегистрировать их в качестве источников данных:
- name: external
     schema: spectrum
     tags: ["spectrum"]
     loader: S3
     description: "External data stored in S3 accessed vith Redshift Spectrum"
     tables:
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API https://openexchangerates.org"
         freshness:
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
         external:
           location: "s3://<BUCKET>/dwh/currencies/"
           row_format: "serde 'org.openx.data.jsonserde.JsonSerDe'"
         columns:
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
Немаловажным элементом является проверка своевременности данных – source freshness test на курсы валют. Тем самым мы будем постоянно держать руку на пульсе поступления актуальных данных в Хранилище. Очень важно рассчитывать все финансовые метрики корректно и в срок, а без актуальных значений курсов задачу решить невозможно.В случае отставания данных – более 15 часов без свежих обменных курсов – мы тут же получаем уведомление в Slack.Для прозрачности и простоты пользователей объединим исторические данные (архив) и постоянно поступающие актуальные курсы (новый API) в одну модель currencies:Объединение исторических и новых данных в единый справочник
{{
   config(
       materialized='table',
       dist='all',
       sort=["business_dt", "base_currency"]
   )
}}
with cbrf as (
select
     business_dt
   , null as business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
from {{ source('external', 'currencies_cbrf') }}
where business_dt <= '2021-02-18'
),
oxr_all as (
   select
     (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
   , o.base as base_currency
   , o.rates.aed::decimal(10,4) as aed
   , o.rates.eur::decimal(10,4) as eur
   , o.rates.gbp::decimal(10,4) as gbp
   , o.rates.rub::decimal(10,4) as rub
   , o.rates.usd::decimal(10,4) as usd
   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
   from {{ source('external', 'currencies_oxr') }} as o
   where business_dt > '2021-02-18'
),
oxr as (
select
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
from {{ ref('stg_currencies_oxr_all') }}
where rn = 1
),
united as (
select
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from cbrf
union all
select
     business_dt
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from oxr
)
select
   business_dt
, business_ts
, base_currency
, aed
, eur
, gbp
, rub
, usd
from united
При этом физически справочник с курсами валют копируется на каждую ноду аналитического кластера Redshift и хранится в отсортированном по дате и базовой валюте  виде для ускорения работы запросов.Использование курсов в моделировании данныхВ целом, работа с курсами валют для аналитиков и инженеров, которые развивают Хранилище Данных не изменилась и осталась весьма простой. Все детали использования нового API, обращения к внешним полу-структурированным документам JSON в S3, объединению с архивными данными скрыты  . В своих трансформациях достаточно сделать простой джоин на таблицу с курсами валют:
select
       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
   from {{ ref('requests') }} r
       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
           and r.currency = currencies.base_currency
Сами метрики конвертируются при помощи простого макроса, который на вход принимает колонку с исходной суммой и колонку с исходным кодом валюты:
-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}
Практико-ориентированное развитиеРабота с данными – одно из наиболее востребованных и бурно развивающихся направлений. Каждый день я нахожу новые интересные задачи и придумываю решения для них. Это захватывающий и интересный путь, расширяющий горизонты.В конце мая состоится юбилейный запуск курса Data Engineer в OTUS, в котором я принимаю участие в роли преподавателя.По прошествии двух лет программа постоянно менялась, адаптировалась. Ближайший запуск принесет ряд нововведений и будет построен вокруг кейсов – реальных прикладных проблем инженеров:
  • Data Architecture
  • Data Lake
  • Data Warehouse
  • NoSQL / NewSQL
  • MLOps
Детально с программой можно ознакомиться на лендинге курса.Также я делюсь своими авторскими заметками и планами в телеграм-канале Technology Enthusiast.Благодарю за внимание.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_big_data, #_data_engineering, #_dbt, #_currency, #_airflow, #_sql, #_analytics, #_blog_kompanii_otus (
Блог компании OTUS
)
, #_big_data, #_data_engineering
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 07:33
Часовой пояс: UTC + 5