[Data Mining, Big Data, Аналитика мобильных приложений, Управление продуктом] Хочу всё знать о клиенте! Или как обогатить сухие факты DWH цифровыми путями и свойствами клиента из Amplitude
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Корпоративное хранилище в Лиге Ставок было создано задолго до внедрения Amplitude. Преимущественно им пользуются аналитики и исследователи. Продакты и маркетологи для получения аналитических данных из хранилища обращались к аналитикам, потому что это требует навыков программирования.
Фактам DWH всегда не хватало чего-то продуктового, цифрового зрения в продуктах, которое подглядывало бы за клиентами и давало нам понимание его путей. С появлением Amplitude в компании мы начали понимать ценность накапливаемых данных в системе и очень круто использовать их в самой Amplitude, но симбиоз двух систем DWH и Amplitude не давал покоя. Мы, конечно же, реализовали механику переливки данных из Amplitude для in-house анализа в корпоративном хранилище и сделали инструкцию по настройке передачи данных из Amplitude в DWH. А также приглашаем вас на вебинар Лиги Ставок и Adventum про анализ и оптимизацию конверсии в продукте.
Чем может помочь объединение данных в DWH
1. Финансы. Финансовые данные хранятся только в DWH, поэтому при добавлении к ним информации о пользовательском пути возможно рассчитать расходы и доходы по любой когорте пользователей.
2. Формирование персональных рекомендаций. Общие данные по отдельному пользователю служат основой для системы персональных рекомендаций в продукте.
3. Профилактика фрода. Выявление клиентов, которые проявляют активность напрямую в API и не отображаются в разметке на фронтенде. Поиск роботов и подозрительных сервисов для дальнейшего анализа их поведения.
Инструкция по передаче данных из Amplitude в DWH
У Amplitude есть свой API выдачи сырых данных по проекту. Подробную документацию по нему можно найти по ссылке. Он реализован очень продумано. Например, если запросить данные за последний час, но этот час еще не закрыт в системе, то вы ничего не получите в ответ. Отличная защита от пробелов в данных. Сначала мы неприятно удивились, пока не поняли, что время событий в UTC — и пока час не закроется, данных мы не получим.
Механика. Агент доставки данных в Лиге Ставок написан на Python, работает в связке с SQL базой и вдохновлен этим туториалом. Спасибо автору! Мы отправляем в Amplitude события с сайта и мобильных приложений, поэтому даже за один час собирается большой объем данных.
При этом важно получать данные оперативно, поэтому была реализована очередь — при окончании каждого часа данные импортируются из Amplitude и сразу заливаются в хранилище. Без промежуточных сохранений, к примеру в CSV, и отложенных ETL обработок.
ETL — Extract, Transform, Load. Это системы корпоративного класса, которые применяются, чтобы привести к одним справочникам и загрузить в DWH данные из нескольких разных учетных систем.
Код. Мы намеренно пишем простые агенты, понятные даже начинающим аналитикам. Поэтому никаких супер фич, только код, нацеленный на выполнение поставленной задачи.
В коде используется Python 3.7 и выше. Например, если у вас нет крутых flow-систем с очередями работы агентов (или, как их вернее назвать, dag), то можно сделать простое расписание, даже на Windows. В таком случае скрипт выполняется с штатным расписанием в планировщике через .bat файл (вызов питона с указанием пути к вашему скрипту). Это позволяет быстро добиться результата и проще администрировать, даже начинающим аналитикам.
Шаг 1. Импорт библиотек
# Библиотеки
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
Шаг 2. Блок переменных
В процессе выполнения принтуется каждый шаг для понимания, где просело время обработки, поэтому нужны таймеры. Каталог объявляется для того, чтобы архив полученных от сервиса данных физически складывался в нужной нам папке.
# Переменные и константы
os.chdir("C:\Agents\AMPL\TEMP") # Каталог хранения архива амплитуды
dts1 = time.time() # Общий Таймер для подсчета времени работы шагов
a = time.time() # финальный таймер
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") # формат для имени архива
Шаг 3. Подключение API Amplitude
Для этого достаточно указать два ключа, которые вы найдете в настройках проекта (Settings => Project = > General).
# Указываем ключики к API амплитуды
api_key = 'ключик'
secret_key = 'еще один ключик'
Шаг 4. Подключение к базе данных (БД) и получение отсечки
В начале мы упоминали, что пошли по пути очереди и забираем данные за каждый час, от последней имеющейся. Очередь ведется в SQL базе, где мы фиксируем последнюю полученную временную отсечку от Амплитуды. Поэтому дальше указан блок подключения к БД и получения этой самой отсечки в формате yyyymmddThh (т. е. какой час даты мы хотим сейчас забрать). Далее при обращении к API мы так и передаем одну дату и время, как начало и конец периода.
# Переменные для подключения к DWH (SQL)
server = "Имя сервера"
user = "логин"
password = "пароль"
# Подключаемся и получаем отсечку очереди
conn = pymssql.connect(server, user, password, "Имя БД")
cursor = conn.cursor()
cursor.execute("Запрос на получение отсечки. Процедура или простой select")
Шаг 5. Преобразование отсечки в переменную
На этом этапе берется полученная от БД отсечка и оборачивается в переменную для дальнейшей передачи в API Amplitude. После этого закрывается коннект к БД.
# Берем в переменую дату и время отсечки очереди
for row in cursor:
dt = row[0]
conn.close()
Шаг 6. Настройка архива
Далее заранее указываем имя архива скачанного из Амплитуды в нужном формате, который потребуется. Мы пишем проект, добавляя дату и час, который забрали, и время, когда выполняли запрос.
# Генерируем имя архива, закидываем временные метки в название
filename = 'AMPL_PROD_'+ dt + '_' + now
# Существующая папка, путь должен оканчиваться на \\ для WIN
# В рабочую папку будут сохраняться файлы, мы ее объявили выше как os.chdir
working_dir = os.getcwd() + '\\'
Шаг 7. Дублирование подключения к SQL
Дублируем подключение к SQL. Для нас это просто разные точки входа, поэтому повторяем подключения к нужной БД в которую будем дальше заливать данные Амплитуды.
# Настройки подключения к DWH (SQL). Дублируем, на случай если точка заливки отличается от точки получения отсечки очереди
server = 'имя сервера'
database = 'База данных'
schema = 'схема таблицы'
table_to_export = 'Название таблицы'
# Параметры подключения к DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
Шаг 8. Получение данных от Amplitude
Здесь начинается большой блок получения данных от Amplitude в виде архива, его сохранения, распаковки и парсинга json в датафрейм.
# Блок получения данных от API амплитуды, сохранения архива и парсинга json
class GetFile():
def __init__(self, now, dt, api_key, secret_key, filename, working_dir):
self.now = now
self.dt = dt
self.api_key = api_key
self.secret_key = secret_key
self.filename = filename
self.working_dir = working_dir
def response(self):
"""
Запрос файла с сервера
"""
print('Отправка запроса на сервер!', end='\n')
count = 0
while True:
count += 1
print(f'Попытка {count}.....', end='')
try:
response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
auth=(self.api_key, self.secret_key),
timeout=10)
print('успешно', end='\n')
print('1. Ответ от сервера получен', end='\n')
break
except:
print('неудачно', end='\n')
time.sleep(1)
return response
def download(self):
'''
Скачивание архива с данными
'''
with open(working_dir + self.filename + '.zip', "wb") as code:
file = self.response()
print('2. Скачивание архива с файлами.....', end='')
code.write(file.content)
print('OK', end='\n')
def extraction(self):
'''
Извлечение файлов в папку на компьютере
'''
z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
z.extractall(path=self.working_dir + self.filename)
print('3. Архив с файлами извлечен и записан в папку ' + self.filename)
def getfilename(self):
'''
Информация о файле и пути
'''
return 'Файл: {} \nПолный путь: {}'.format(self.filename, self.working_dir + self.filename + '.zip')
def unzip_and_create_df(working_dir, filename):
'''
Распаковка JSON.gz и преобразование json к обычному табличному формату (конкатенация файлов по индексу)
Индексы дублируются, но они не нужны.
'''
directory = working_dir + filename + '\\274440'
files = os.listdir(directory)
df = pd.DataFrame()
print('Прогресс обработки файлов:')
time.sleep(1)
for i in tqdm(files):
with gzip.open(directory + '\\' + i) as f:
add = pd.read_json(f, lines=True)
df = pd.concat([df, add], axis=0)
time.sleep(1)
print('4. JSON файлы из архива успешно преобразованы и записаны в dataframe')
return df
# Создание класса загрузки файла
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)
# Загрузка файла (уже включает в себя обращение на сервер)
file.download()
# Извлечение gz-файлов в одноимённую папку
file.extraction()
# Создание общего DataFrame на базе распакованных json.gz
adf = unzip_and_create_df(working_dir, filename)
Шаг 9. Маппинг нужных столбцов (опционально)
Итак, датафрейм мы заполнили. Ниже у нас идет маппинг нужных нам столбцов. Мы забираем не все строки, и для этого сначала получаем их из настроечных таблиц SQL. Этот шаг в целом можно пропускать и забирать все.
# Фиксируем этап и статус
print('5. Считывание данных с БД, настройка связей, чистка, обработка.....', end='')
# Запрос к DWH для получения списка нужных столбцов и их типов
# Если нужно работать со всем массивом - пропускайте этот пункт
sql_query_columns = ("""
'Тут запрос какие поля нам нужны и как их переименовать при импорте в БД'
""")
settdf = pd.read_sql_query(sql_query_columns, new_con)
# Приведение к lower() строк (=названий столбцов) из столбца SAVE_COLUMN_NAME из dwh
# Переименование название столбцов, lower() для последующего мерджа с таблицей хранилища
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]
# Получение нужных столбцов
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]
# Добавление столбца в список необходимых
needed_columns.append('DOWNLOAD_FILE_NAME')
# Добавление столбца в DF c названием файла
adf['DOWNLOAD_FILE_NAME'] = filename
# Сброс индекса (для красоты, адекватности, поможет дебажить)
adf.reset_index(inplace=True)
# Перевод в юникод (текстовый формат) всех непустых значений, пустые остаются пустыми
adf = adf.astype('unicode_').where(pd.notnull(adf), None)
# Срез DataFrame для загрузки в базу
df_to_sql = adf[needed_columns]
# Дописываем статус в сточку принта
print('OK', end='\n')
Шаг 10. Отсчет лога и вставка датафрейма
На данном шаге все готово для передачи данных в таблицу хранилища. Для начала важно настроить отсчет лога и вставить курсором весь датафрейм.
# Блок заливки в DWH
# Начало лога блока
dts2 = time.time()
print('6. Заливка данных в БД...', end='')
# Подключаемся по объявленным параметрам к DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)
# Вставка курсором (батчами) сета в тбл DWH (в нашем случае - нужных столбцов)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
# При None раздувает RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)
# Закрываем логи и коннект
connection.close()
print('OK', end='\n')
dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + ' мин ' + str(int((dtf - i) % 60)) + ' сек' for i in (dts1, dts2)]
print(f'Общее время: {diff1}, Заливка: {diff2}')
print('Загрузка завершена, файл выполнил работу')
Шаг 11. Фиксирование успешной передачи
Поздравляем! Данные успешно переданы в таблицу. Теперь нужно зафиксировать успешную заливку сдвигом отсечки очереди для следующего запуска.
# Фиксируем успешную обработку очереди
# Открываем коннект и формируем запрос
conn2 = pymssql.connect(server, user, password, "Имя базы данных")
cursor2 = conn2.cursor()
query = "Инсерт лога в очередь или запуск процедуры, как удобно")
# выполняем запрос
cursor2.execute(query)
# Закрываем логи и коннект
conn2.commit()
conn2.close()
Шаг 12. Запуск процесса обработки связанных витрин
Все, очередь зафиксировали. Следующий запуск зальет данные с новой временной отсечкой. Но помните, что нам важно забирать данные относительно оперативно? Чтобы не тратить время, мы сообщаем всему остальному ETL о том, что мы получили данные и запускаем процесс обработки других различных связанных витрин.
print('Мердж в ДВХ')
# В нашем случае мы еще запускаем ETL обработчик новых данных для оперативного использования витрины
conn3 = pymssql.connect(server, user, password, "Имя базы данных")
cursor3 = conn3.cursor()
query = "Запуск ETL процедур дальнейшего обновления. Например EXEC dbo.SP"
cursor3.execute(query)
conn3.commit()
conn3.close()
Шаг 13. Фиксация времени исполнения
Последний шаг, на котором работу можно считать законченной.
# Фиксируем общий лог и время работы
b = time.time()
diff = b-a
minutes = diff//60
print('Выполнение кода заняло: {:.0f} минут(ы)'.format( minutes))
Следующий запуск агента возьмет новый часовой срез и выполнит очередную загрузку. Если при работе агента произойдет сбой — очередь не фиксируется, и весь процесс повторится заново.
Также в последнем шаге, при вызове ETL, имеются проверки на дублирование данных и их полноту. При необходимости автоматика сама вернет отсечку до сбоя, даже если частично что-то залилось, и проверит наличие дублей.
И конечно же, помимо получения данных из Amplitude, их можно обогащать непосредственно в сервисе продуктовой аналитики. У нас реализована межсерверная s2s доставка данных, о который мы расскажем в будущих обновлениях спецпроекта.
Приходите на вебинар про анализ и оптимизацию в продукте 20 мая в 17:00 МСК. Участие бесплатное. Расскажем и покажем, как повысить конверсию с помощью Амплитуды.
===========
Источник:
habr.com
===========
Похожие новости:
- [Big Data] Нетология совместно с Yandex.Cloud запустила бесплатный курс по визуализации данных
- [Анализ и проектирование систем] Аналитик в автоматизации — кто он и чем занимается
- [Python, Data Mining, Big Data, R, Визуализация данных] Оценка кредитного портфеля на R
- [Big Data, Data Engineering] Курсы валют и аналитика – использование обменных курсов в Хранилище Данных
- [Программирование, Анализ и проектирование систем, Аналитика мобильных приложений] Какие ошибки совершает аналитик в первые полгода работы и как их избежать
- [Big Data, Законодательство в IT, Финансы в IT] Минцифры предлагает продавать стартапам доступ к государственным данным
- [Аналитика мобильных приложений] Fix Price запустил мобильное приложение с обновленным функционалом для Android
- [Управление продуктом, Управление медиа, Копирайт, Криптовалюты] Fox выпустит следующий мультфильм создателя «Рика и Морти» на блокчейне и с NFT
- [Управление проектами, Управление продуктом] Так Product или Project Manager?
- [Service Desk, Управление проектами, Управление продуктом, IT-компании] Доставка, которая доставляет: субъективные мысли и выводы простого клиента
Теги для поиска: #_data_mining, #_big_data, #_analitika_mobilnyh_prilozhenij (Аналитика мобильных приложений), #_upravlenie_produktom (Управление продуктом), #_liga_stavok (лига ставок), #_prodaktmenedzhment (продакт-менеджмент), #_prodaktmenedzher (продакт-менеджер), #_produktovaja_analitiki (продуктовая аналитики), #_analitika (аналитика), #_analiz_dannyh (анализ данных), #_analitika_mobilnyh_prilozhenij (аналитика мобильных приложений), #_blog_kompanii_liga_stavok (
Блог компании Лига Ставок
), #_data_mining, #_big_data, #_analitika_mobilnyh_prilozhenij (
Аналитика мобильных приложений
), #_upravlenie_produktom (
Управление продуктом
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:58
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Корпоративное хранилище в Лиге Ставок было создано задолго до внедрения Amplitude. Преимущественно им пользуются аналитики и исследователи. Продакты и маркетологи для получения аналитических данных из хранилища обращались к аналитикам, потому что это требует навыков программирования. Фактам DWH всегда не хватало чего-то продуктового, цифрового зрения в продуктах, которое подглядывало бы за клиентами и давало нам понимание его путей. С появлением Amplitude в компании мы начали понимать ценность накапливаемых данных в системе и очень круто использовать их в самой Amplitude, но симбиоз двух систем DWH и Amplitude не давал покоя. Мы, конечно же, реализовали механику переливки данных из Amplitude для in-house анализа в корпоративном хранилище и сделали инструкцию по настройке передачи данных из Amplitude в DWH. А также приглашаем вас на вебинар Лиги Ставок и Adventum про анализ и оптимизацию конверсии в продукте. Чем может помочь объединение данных в DWH 1. Финансы. Финансовые данные хранятся только в DWH, поэтому при добавлении к ним информации о пользовательском пути возможно рассчитать расходы и доходы по любой когорте пользователей. 2. Формирование персональных рекомендаций. Общие данные по отдельному пользователю служат основой для системы персональных рекомендаций в продукте. 3. Профилактика фрода. Выявление клиентов, которые проявляют активность напрямую в API и не отображаются в разметке на фронтенде. Поиск роботов и подозрительных сервисов для дальнейшего анализа их поведения. Инструкция по передаче данных из Amplitude в DWH У Amplitude есть свой API выдачи сырых данных по проекту. Подробную документацию по нему можно найти по ссылке. Он реализован очень продумано. Например, если запросить данные за последний час, но этот час еще не закрыт в системе, то вы ничего не получите в ответ. Отличная защита от пробелов в данных. Сначала мы неприятно удивились, пока не поняли, что время событий в UTC — и пока час не закроется, данных мы не получим. Механика. Агент доставки данных в Лиге Ставок написан на Python, работает в связке с SQL базой и вдохновлен этим туториалом. Спасибо автору! Мы отправляем в Amplitude события с сайта и мобильных приложений, поэтому даже за один час собирается большой объем данных. При этом важно получать данные оперативно, поэтому была реализована очередь — при окончании каждого часа данные импортируются из Amplitude и сразу заливаются в хранилище. Без промежуточных сохранений, к примеру в CSV, и отложенных ETL обработок. ETL — Extract, Transform, Load. Это системы корпоративного класса, которые применяются, чтобы привести к одним справочникам и загрузить в DWH данные из нескольких разных учетных систем. Код. Мы намеренно пишем простые агенты, понятные даже начинающим аналитикам. Поэтому никаких супер фич, только код, нацеленный на выполнение поставленной задачи. В коде используется Python 3.7 и выше. Например, если у вас нет крутых flow-систем с очередями работы агентов (или, как их вернее назвать, dag), то можно сделать простое расписание, даже на Windows. В таком случае скрипт выполняется с штатным расписанием в планировщике через .bat файл (вызов питона с указанием пути к вашему скрипту). Это позволяет быстро добиться результата и проще администрировать, даже начинающим аналитикам. Шаг 1. Импорт библиотек # Библиотеки
import os import requests import pandas as pd import zipfile import gzip import time from tqdm import tqdm import pymssql import datetime import pyodbc from urllib.parse import quote_plus from sqlalchemy import create_engine, event Шаг 2. Блок переменных В процессе выполнения принтуется каждый шаг для понимания, где просело время обработки, поэтому нужны таймеры. Каталог объявляется для того, чтобы архив полученных от сервиса данных физически складывался в нужной нам папке. # Переменные и константы
os.chdir("C:\Agents\AMPL\TEMP") # Каталог хранения архива амплитуды dts1 = time.time() # Общий Таймер для подсчета времени работы шагов a = time.time() # финальный таймер now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") # формат для имени архива Шаг 3. Подключение API Amplitude Для этого достаточно указать два ключа, которые вы найдете в настройках проекта (Settings => Project = > General). # Указываем ключики к API амплитуды
api_key = 'ключик' secret_key = 'еще один ключик' Шаг 4. Подключение к базе данных (БД) и получение отсечки В начале мы упоминали, что пошли по пути очереди и забираем данные за каждый час, от последней имеющейся. Очередь ведется в SQL базе, где мы фиксируем последнюю полученную временную отсечку от Амплитуды. Поэтому дальше указан блок подключения к БД и получения этой самой отсечки в формате yyyymmddThh (т. е. какой час даты мы хотим сейчас забрать). Далее при обращении к API мы так и передаем одну дату и время, как начало и конец периода. # Переменные для подключения к DWH (SQL)
server = "Имя сервера" user = "логин" password = "пароль" # Подключаемся и получаем отсечку очереди conn = pymssql.connect(server, user, password, "Имя БД") cursor = conn.cursor() cursor.execute("Запрос на получение отсечки. Процедура или простой select") Шаг 5. Преобразование отсечки в переменную На этом этапе берется полученная от БД отсечка и оборачивается в переменную для дальнейшей передачи в API Amplitude. После этого закрывается коннект к БД. # Берем в переменую дату и время отсечки очереди
for row in cursor: dt = row[0] conn.close() Шаг 6. Настройка архива Далее заранее указываем имя архива скачанного из Амплитуды в нужном формате, который потребуется. Мы пишем проект, добавляя дату и час, который забрали, и время, когда выполняли запрос. # Генерируем имя архива, закидываем временные метки в название
filename = 'AMPL_PROD_'+ dt + '_' + now # Существующая папка, путь должен оканчиваться на \\ для WIN # В рабочую папку будут сохраняться файлы, мы ее объявили выше как os.chdir working_dir = os.getcwd() + '\\' Шаг 7. Дублирование подключения к SQL Дублируем подключение к SQL. Для нас это просто разные точки входа, поэтому повторяем подключения к нужной БД в которую будем дальше заливать данные Амплитуды. # Настройки подключения к DWH (SQL). Дублируем, на случай если точка заливки отличается от точки получения отсечки очереди
server = 'имя сервера' database = 'База данных' schema = 'схема таблицы' table_to_export = 'Название таблицы' # Параметры подключения к DWH (SQL) params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';' quoted = quote_plus(params) new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted) Шаг 8. Получение данных от Amplitude Здесь начинается большой блок получения данных от Amplitude в виде архива, его сохранения, распаковки и парсинга json в датафрейм. # Блок получения данных от API амплитуды, сохранения архива и парсинга json
class GetFile(): def __init__(self, now, dt, api_key, secret_key, filename, working_dir): self.now = now self.dt = dt self.api_key = api_key self.secret_key = secret_key self.filename = filename self.working_dir = working_dir def response(self): """ Запрос файла с сервера """ print('Отправка запроса на сервер!', end='\n') count = 0 while True: count += 1 print(f'Попытка {count}.....', end='') try: response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt, auth=(self.api_key, self.secret_key), timeout=10) print('успешно', end='\n') print('1. Ответ от сервера получен', end='\n') break except: print('неудачно', end='\n') time.sleep(1) return response def download(self): ''' Скачивание архива с данными ''' with open(working_dir + self.filename + '.zip', "wb") as code: file = self.response() print('2. Скачивание архива с файлами.....', end='') code.write(file.content) print('OK', end='\n') def extraction(self): ''' Извлечение файлов в папку на компьютере ''' z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r') z.extractall(path=self.working_dir + self.filename) print('3. Архив с файлами извлечен и записан в папку ' + self.filename) def getfilename(self): ''' Информация о файле и пути ''' return 'Файл: {} \nПолный путь: {}'.format(self.filename, self.working_dir + self.filename + '.zip') def unzip_and_create_df(working_dir, filename): ''' Распаковка JSON.gz и преобразование json к обычному табличному формату (конкатенация файлов по индексу) Индексы дублируются, но они не нужны. ''' directory = working_dir + filename + '\\274440' files = os.listdir(directory) df = pd.DataFrame() print('Прогресс обработки файлов:') time.sleep(1) for i in tqdm(files): with gzip.open(directory + '\\' + i) as f: add = pd.read_json(f, lines=True) df = pd.concat([df, add], axis=0) time.sleep(1) print('4. JSON файлы из архива успешно преобразованы и записаны в dataframe') return df # Создание класса загрузки файла file = GetFile(now, dt, api_key, secret_key, filename, working_dir) # Загрузка файла (уже включает в себя обращение на сервер) file.download() # Извлечение gz-файлов в одноимённую папку file.extraction() # Создание общего DataFrame на базе распакованных json.gz adf = unzip_and_create_df(working_dir, filename) Шаг 9. Маппинг нужных столбцов (опционально) Итак, датафрейм мы заполнили. Ниже у нас идет маппинг нужных нам столбцов. Мы забираем не все строки, и для этого сначала получаем их из настроечных таблиц SQL. Этот шаг в целом можно пропускать и забирать все. # Фиксируем этап и статус
print('5. Считывание данных с БД, настройка связей, чистка, обработка.....', end='') # Запрос к DWH для получения списка нужных столбцов и их типов # Если нужно работать со всем массивом - пропускайте этот пункт sql_query_columns = (""" 'Тут запрос какие поля нам нужны и как их переименовать при импорте в БД' """) settdf = pd.read_sql_query(sql_query_columns, new_con) # Приведение к lower() строк (=названий столбцов) из столбца SAVE_COLUMN_NAME из dwh # Переименование название столбцов, lower() для последующего мерджа с таблицей хранилища settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower()) adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns] # Получение нужных столбцов needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()] # Добавление столбца в список необходимых needed_columns.append('DOWNLOAD_FILE_NAME') # Добавление столбца в DF c названием файла adf['DOWNLOAD_FILE_NAME'] = filename # Сброс индекса (для красоты, адекватности, поможет дебажить) adf.reset_index(inplace=True) # Перевод в юникод (текстовый формат) всех непустых значений, пустые остаются пустыми adf = adf.astype('unicode_').where(pd.notnull(adf), None) # Срез DataFrame для загрузки в базу df_to_sql = adf[needed_columns] # Дописываем статус в сточку принта print('OK', end='\n') Шаг 10. Отсчет лога и вставка датафрейма На данном шаге все готово для передачи данных в таблицу хранилища. Для начала важно настроить отсчет лога и вставить курсором весь датафрейм. # Блок заливки в DWH
# Начало лога блока dts2 = time.time() print('6. Заливка данных в БД...', end='') # Подключаемся по объявленным параметрам к DWH connection = pyodbc.connect(params) engine = create_engine(new_con) # Вставка курсором (батчами) сета в тбл DWH (в нашем случае - нужных столбцов) @event.listens_for(engine, 'before_cursor_execute') def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): if executemany: cursor.fast_executemany = True # При None раздувает RAM df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False) # Закрываем логи и коннект connection.close() print('OK', end='\n') dtf = time.time() diff1, diff2 = [str(int((dtf - i) // 60)) + ' мин ' + str(int((dtf - i) % 60)) + ' сек' for i in (dts1, dts2)] print(f'Общее время: {diff1}, Заливка: {diff2}') print('Загрузка завершена, файл выполнил работу') Шаг 11. Фиксирование успешной передачи Поздравляем! Данные успешно переданы в таблицу. Теперь нужно зафиксировать успешную заливку сдвигом отсечки очереди для следующего запуска. # Фиксируем успешную обработку очереди
# Открываем коннект и формируем запрос conn2 = pymssql.connect(server, user, password, "Имя базы данных") cursor2 = conn2.cursor() query = "Инсерт лога в очередь или запуск процедуры, как удобно") # выполняем запрос cursor2.execute(query) # Закрываем логи и коннект conn2.commit() conn2.close() Шаг 12. Запуск процесса обработки связанных витрин Все, очередь зафиксировали. Следующий запуск зальет данные с новой временной отсечкой. Но помните, что нам важно забирать данные относительно оперативно? Чтобы не тратить время, мы сообщаем всему остальному ETL о том, что мы получили данные и запускаем процесс обработки других различных связанных витрин. print('Мердж в ДВХ')
# В нашем случае мы еще запускаем ETL обработчик новых данных для оперативного использования витрины conn3 = pymssql.connect(server, user, password, "Имя базы данных") cursor3 = conn3.cursor() query = "Запуск ETL процедур дальнейшего обновления. Например EXEC dbo.SP" cursor3.execute(query) conn3.commit() conn3.close() Шаг 13. Фиксация времени исполнения Последний шаг, на котором работу можно считать законченной. # Фиксируем общий лог и время работы
b = time.time() diff = b-a minutes = diff//60 print('Выполнение кода заняло: {:.0f} минут(ы)'.format( minutes)) Следующий запуск агента возьмет новый часовой срез и выполнит очередную загрузку. Если при работе агента произойдет сбой — очередь не фиксируется, и весь процесс повторится заново. Также в последнем шаге, при вызове ETL, имеются проверки на дублирование данных и их полноту. При необходимости автоматика сама вернет отсечку до сбоя, даже если частично что-то залилось, и проверит наличие дублей. И конечно же, помимо получения данных из Amplitude, их можно обогащать непосредственно в сервисе продуктовой аналитики. У нас реализована межсерверная s2s доставка данных, о который мы расскажем в будущих обновлениях спецпроекта. Приходите на вебинар про анализ и оптимизацию в продукте 20 мая в 17:00 МСК. Участие бесплатное. Расскажем и покажем, как повысить конверсию с помощью Амплитуды. =========== Источник: habr.com =========== Похожие новости:
Блог компании Лига Ставок ), #_data_mining, #_big_data, #_analitika_mobilnyh_prilozhenij ( Аналитика мобильных приложений ), #_upravlenie_produktom ( Управление продуктом ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 14:58
Часовой пояс: UTC + 5