[Python, Data Engineering] Python API в Delta Lake — простые и надежные операции Upsert и Delete (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Приглашаем всех желающих на бесплатный демо-урок,в рамках которого рассмотрим Ni-Fi и роль data ingestion инструментов в целом при построении систем обработки данных. А также решим простую задачку по построению пайплайна для загрузки файлов в хранилище данных с использованием Ni-Fi. Урок проведет эксперт OTUS - Егор Матешук.
А прямо сейчас традиционно делимся полезным переводом.
Delta Lake 0.4.0 включает Python API и преобразование Parquet в таблицу Delta Lake на местеМы рады объявить о релизе Delta Lake 0.4.0, в котором представлен Python API, улучшающий манипулирование и управление данными в Delta-таблицах. Ключевыми фичами этого релиза являются:
- Python API для DML и служебных операций (#89) - теперь вы можете использовать Python API для обновления(update)/удаления(delete)/слияния(merge) данных и выполнения служебных операций (а именно, vacuum и history) в таблицах Delta Lake. Они отлично подходят для создания сложных рабочих нагрузок в Python, например, операций медленно меняющихся измерений (SCD - Slowly Changing Dimension), слияния изменений данных для репликации и операций upsert из потоковых запросов. Для получения более подробной информации читайте документацию.
- Convert-to-Delta (#78) - теперь вы можете преобразовать таблицу Parquet в таблицу Delta Lake на месте без перезаписи каких-либо данных. Эта функция отлично подходит для преобразования очень больших таблиц Parquet, которые было бы довольно затратно перезаписывать в Delta-таблицу. Более того, этот процесс обратим - вы можете преобразовать таблицу Parquet в таблицу Delta Lake, поработать с ней (например, удалить или объединить) и легко преобразовать ее обратно в таблицу Parquet. Для получения более подробной информации читайте документацию.
- SQL для служебных операций - теперь вы можете использовать SQL для выполнения служебных операций vacuum и history. Смотрите документацию для получения дополнительных сведений о том, как настроить Spark для выполнения этих специфичных для Delta Lake команд SQL.
Больше информации вы можете найти в примечаниях к релизу Delta Lake 0.4.0 и в документации по Delta Lake > Удаление, обновление и слияние таблиц.Извините, данный ресурс не поддреживается. :( В этой статье мы продемонстрируем как использовать Python и новый Python API в Delta Lake 0.4.0 в контексте данных о вылетах и задержках рейсов на Apache Spark™ 2.4.3. Мы продемонстрируем, как производить операции upsert и delete, запрашивать старые версии данных с помощью механизма путешествия во времени (time travel) и подчищать старые версии данных с помощью vacuum.Начало работы с Delta LakeПакет Delta Lake доступен через параметр в --packages. В нашем примере мы также продемонстрируем возможность делать VACUUM файлов и выполнять Delta Lake SQL команды в Apache Spark. Поскольку это короткая демонстрация, мы также подключим следующие конфигурации:
- spark.databricks.delta.retentionDurationCheck.enabled=false, что позволит нам производить vacuum файлов, срок хранения которых меньше установленного по умолчанию срока в 7 дней. Эта настройка нужна только для команды SQL VACUUM.
- spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension для подключения Delta Lake SQL команд в Apache Spark; для вызовов Python API или Scala этого не требуется.
# Подключение пакета в Spark
./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
Загрузка и сохранение наших Delta Lake данныхВ этой статье мы будем использовать набор летных данных о времени вылетов или задержках рейсов, сгенерированный на основе статистики вылетов RITA BTS; некоторые примеры этих данных в действии можно посмотреть здесь - Данные о вылетах рейсов за 2014 в d3.js Crossfilter, и здесь - Данные о вылетах и задержках рейсов в GraphFrames для Apache Spark™. Этот набор данных можно загрузить с этого репозитория на github. Начнем с чтения набора данных в pyspark.
# Переменные местонахождения
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"
# Чтение данных о задержках рейсов
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)
Затем давайте сохраним наш набор данных departureDelays в таблицу Delta Lake. Сохранив эту таблицу в хранилище Delta Lake, мы сможем воспользоваться его функциями, которые включают ACID транзакции, унифицированную пакетную обработку и потоковую передачу, а также путешествия во времени.
# Сохраняем данные о задержках рейсов в формате Delta Lake
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")
Обратите внимание, этот подход аналогичен обычному сохранению данных Parquet; вместо указания format("parquet") вы просто указываете format("delta"). Если вы заглянете в базовую файловую систему, вы заметите четыре файла, созданных для таблицы Delta Lake departureDelays.
/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
Отметим, что _delta_log - папка, которая содержит лог транзакций Delta Lake. Для получения более подробной информации читайте Погружение в Delta Lake: распаковка лога транзакций.
Теперь давайте перезагрузим данные, но на этот раз наш DataFrame будет поддерживаться Delta Lake.
# Загружаем данные о задержках рейсов в формате Delta Lake
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")
# Создаем временное представление
delays_delta.createOrReplaceTempView("delays_delta")
# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
Наконец, давайте определим количество рейсов из Сиэтла в Сан-Франциско; в этом наборе данных таких рейсов 1698.Преобразование на месте в Delta LakeЕсли у вас есть уже существующие таблицы Parquet, у вас появилась возможность выполнять преобразование ваших таблиц в Delta Lake на месте, т.е. вам не нужно переписывать таблицу. Чтобы преобразовать таблицу, вам достаточно выполнить следующие команды.
from delta.tables import *
# Преобразовать не секционированную таблицу parquet по пути '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")
# Преобразовать секционированную таблицу parquet по пути '/path/to/table', секционированную целочисленным столбцом с именем 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
Для получения более подробной информации, в том числе и о том, как сделать это преобразование в Scala и SQL, читайте Преобразование в Delta Lake.Удаление наших летных данныхЧтобы удалить данные из традиционной Data Lake таблицы, вам необходимо:
- Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите удалить.
- Создать новую таблицу на основе предыдущего запроса.
- Удалить исходную таблицу.
- Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.
Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор DELETE. Чтобы продемонстрировать это, давайте удалим все рейсы, которые прибыли вовремя или раньше назначенного времени (т.е. delay < 0).
from delta.tables import *
from pyspark.sql.functions import *
# Доступ к таблице Delta Lake
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# Удаляем все рейсы пришедшие раньше или вовремя
deltaTable.delete("delay < 0")
# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
После удаления (подробнее об этом ниже) всех рейсов пришедших вовремя или раньше, как вы можете видеть из предыдущего запроса, осталось 837 опоздавших рейсов из Сиэтла в Сан-Франциско. Если вы заглянете в файловую систему, вы заметите, что файлов стало больше, даже если вы удалили данные.
/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
В традиционных озерах данных удаление (операция delete) выполняется путем перезаписи всей таблицы, за исключением значений, которые должны быть удалены. В Delta Lake вместо этого удаление выполняется путем выборочной записи новых версий файлов, содержащих удаляемые данные, а предыдущие файлы только помечаются как удаленные. Это связано с тем, что для выполнения атомарных операций с таблицей Delta Lake использует управление параллельным доступом с помощью многоверсионности: например, пока один пользователь удаляет данные, другой пользователь может запрашивать предыдущую версию таблицы. Эта многоверсионная модель также позволяет нам путешествовать назад во времени (механизм путешествия во времени) и запрашивать предыдущие версии, как мы увидим позже. Обновление наших летных данных Чтобы обновить данные из вашей традиционной таблицы Data Lake, вам необходимо:
- Выбрать все данные из вашей таблицы, за исключением строк, которые вы хотите изменить.
- Изменить строки, которые необходимо обновить/изменить.
- Объединить эти две таблицы, чтобы создать новую таблицу.
- Удалить исходную таблицу.
- Назвать новую таблицу именем исходной таблицы для нисходящих зависимостей.
Вместо выполнения всех этих шагов в Delta Lake мы можем упростить этот процесс, выполнив оператор UPDATE. Чтобы продемонстрировать это, давайте обновим информацию обо всех рейсах из Детройта, заменив Детройт на Сиэтл.
# Обновляем все рейсы из Детройта, чтобы теперь они стали из Сиэтла
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } )
# Сколько рейсов между Сиэтлом и Сан-Франциско
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
Теперь, когда рейсы из Детройта теперь помечены как рейсы Сиэтла, у нас насчитывается 986 рейсов, вылетающих из Сиэтла в Сан-Франциско. Если выведете содержимое папки файловой системы departureDelays (например, $../departureDelays/ls -l), вы заметите, что теперь там 11 файлов (вместо 8 сразу после удаления файлов и четырех файлов после создания таблицы).Объединение наших летных данныхРаспространенный сценарий при работе с озером данных - это постоянное добавление данных в вашу таблицу. Это часто приводит к дублированию данных (строки, которые вы бы не хотели снова вставлять в таблицу), появлению новых строк, которые необходимо вставить, и строк, которые необходимо обновить. В Delta Lake все это можно решить с помощью операции merge (аналогично оператору SQL MERGE).Начнем с выборочного набора данных, который вы хотите обновить, вставить или дедуплицировать с помощью следующего запроса.
# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
Результат этого запроса приведен таблице ниже. Обратите внимание, что для наглядности были добавлены цветные выделения, чтобы четко определить, какие строки дедуплицированы (синий), обновлены (желтый) и вставлены (зеленый).
Затем давайте сгенерируем нашу собственную merge_table, содержащую данные, которые мы будем вставлять, обновлять или дедуплицировать с помощью следующего фрагмента кода.
items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination']
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()
В предыдущей таблице (merge_table) есть три строки, с уникальным значением даты:
- 1010521: эта строка обновит таблицу flights новым значением задержки (желтый)
- 1010710: эта строка является дубликатом (синий)
- 1010822: это новая строка будет вставлена (зеленый)
В Delta Lake все это можно легко сделать с помощью оператора merge, как указано в следующем фрагменте кода.
# Объединяем merge_table с flights
deltaTable.alias("flights") \
.merge(merge_table.alias("updates"),"flights.date = updates.date") \
.whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
.whenNotMatchedInsertAll() \
.execute()
# Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
Все три действия: дедупликация, обновление и вставка были эффективно выполнены с помощью одного оператора.
Просмотр истории таблицыКак уже отмечалось ранее, после каждой нашей транзакции (удаление, обновление) в файловой системе создавалось все больше файлов. Это связано с тем, что для каждой транзакции существуют разные версии таблицы Delta Lake. Это можно увидеть с помощью метода DeltaTable.history(), как показано ниже.
deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
| 2|2019-09-29 15:41:22| null| null| UPDATE|[predicate -> (or...|null| null| null| 1| null| false|
| 1|2019-09-29 15:40:45| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false|
| 0|2019-09-29 15:40:14| null| null| WRITE|[mode -> Overwrit...|null| null| null| null| null| false|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
Вы также можете выполнить это с помощью SQL:spark.sql("DESCRIBE HISTORY '" + pathToEventsTable + "'").show()Как видите, есть три строки, представляющие различные версии таблицы (ниже представлена сокращенная версия для облегчения чтения) для каждой операции (создание таблицы, удаление и обновление):versiontimestampoperationoperationParameters22019-09-29 15:41:22UPDATE[predicate -> (or…12019-09-29 15:40:45DELETE[predicate -> [“(…02019-09-29 15:40:14WRITE[mode -> Overwrit…Путешествие назад во времени по истории таблицыС помощью механизма путешествия во времени вы можете просмотреть таблицу Delta Lake указанной версии или для определенного таймстемпа. Для получения более подробной информации читайте Документацию Delta Lake > Чтение старых версии данных с помощью Time Travel. Чтобы просмотреть старые данные, укажите опцию version или Timestamp; в фрагменте кода ниже, мы указываем version
# Загружаем DataFrames для каждой версии
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")
# Рассчитываем количество полетов от Сиэтла (SEA) до Сан-Франциско (SFO) для каждой версии истории
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()
# Выводим значение
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))
## Вывод
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
Очистка старых версий таблиц с помощью VacuumМетод vacuum Delta Lake по умолчанию удаляет все строки (и файлы) старше 7 дней (референс: Delta Lake Vacuum). Если бы вы заглянули в файловую систему, вы бы заметили 11 файлов для своей таблицы.
/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Чтобы удалить все файлы и сохранить только текущий снапшот данных, укажите очень маленькое значение в методе vacuum (вместо 7 дней хранения по умолчанию).
Вы можете выполнить ту же задачу с помощью SQL синтаксиса: ¸
# Удаляем все файлы старше 0 часов
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
После завершения операции vacuum, при просмотре файловой системы вы заметите намного меньше файлов, так как старые данные были удалены.
/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Обратите внимание, что возможность переместиться во времени обратно к версии старше периода хранения теряется после запуска vacuum.Что дальше?Попробуйте Delta Lake уже сегодня, запустив приведенные выше фрагменты кода на своем инстансе Apache Spark 2.4.3 (или выше). Используя Delta Lake, вы можете сделать свои озера данных более надежными (независимо от того, создаете ли вы новое или мигрируете существующее озеро данных). Чтобы узнать больше, посетите https://delta.io/ и присоединитесь к сообществу Delta Lake в Slack и Google Group. Вы можете отслеживать все предстоящие релизы и запланированные фичи в github milestones.Записаться на бесплатный демо-урок.Читать ещё:
===========
Источник:
habr.com
===========
===========
Автор оригинала: Denny Lee and Tathagata Das
===========Похожие новости:
- [Программирование, Разработка под iOS, Swift] Разница между @StateObject, @EnvironmentObject и @ObservedObject в SwiftUI (перевод)
- [Python, Программирование, Машинное обучение] Быстрый градиентный бустинг с CatBoost (перевод)
- [Разработка под iOS, Системы сборки, Облачные сервисы] Интеграция CI/CD для нескольких сред с Jenkins и Fastlane. Часть 2 (перевод)
- [Python, Обработка изображений, Машинное обучение, Облачные сервисы, Kubernetes] Архитектура облачного волейбольного сервиса
- [Java, DevOps, Kubernetes] Пример развертывания Spring Boot-приложения в Kubernetes (перевод)
- [Cisco, Сетевые технологии] VxLAN фабрика часть 4. Multipod
- [] Жуткие байки айтишников
- [Python, Программирование] Метаклассы в Python (перевод)
- [Программирование, Data Mining, ООП, R, Data Engineering] ООП в языке R (часть 2): R6 классы
- [Информационная безопасность, Big Data, Data Engineering] Модели угроз в дифференциальной приватности (перевод)
Теги для поиска: #_python, #_data_engineering, #_delta_lake, #_python, #_python_api, #_blog_kompanii_otus._onlajnobrazovanie (
Блог компании OTUS. Онлайн-образование
), #_python, #_data_engineering
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 25-Ноя 19:17
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Приглашаем всех желающих на бесплатный демо-урок,в рамках которого рассмотрим Ni-Fi и роль data ingestion инструментов в целом при построении систем обработки данных. А также решим простую задачку по построению пайплайна для загрузки файлов в хранилище данных с использованием Ni-Fi. Урок проведет эксперт OTUS - Егор Матешук.
А прямо сейчас традиционно делимся полезным переводом.
# Подключение пакета в Spark
./bin/pyspark --packages io.delta:delta-core2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" # Переменные местонахождения
tripdelaysFilePath = "/root/data/departuredelays.csv" pathToEventsTable = "/root/deltalake/departureDelays.delta" # Чтение данных о задержках рейсов departureDelays = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv(tripdelaysFilePath) # Сохраняем данные о задержках рейсов в формате Delta Lake
departureDelays \ .write \ .format("delta") \ .mode("overwrite") \ .save("departureDelays.delta") /departureDelays.delta$ ls -l
. .. _delta_log part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet Отметим, что _delta_log - папка, которая содержит лог транзакций Delta Lake. Для получения более подробной информации читайте Погружение в Delta Lake: распаковка лога транзакций.
# Загружаем данные о задержках рейсов в формате Delta Lake
delays_delta = spark \ .read \ .format("delta") \ .load("departureDelays.delta") # Создаем временное представление delays_delta.createOrReplaceTempView("delays_delta") # Сколько рейсов между Сиэтлом и Сан-Франциско spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show() Наконец, давайте определим количество рейсов из Сиэтла в Сан-Франциско; в этом наборе данных таких рейсов 1698.Преобразование на месте в Delta LakeЕсли у вас есть уже существующие таблицы Parquet, у вас появилась возможность выполнять преобразование ваших таблиц в Delta Lake на месте, т.е. вам не нужно переписывать таблицу. Чтобы преобразовать таблицу, вам достаточно выполнить следующие команды. from delta.tables import *
# Преобразовать не секционированную таблицу parquet по пути '/path/to/table' deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`") # Преобразовать секционированную таблицу parquet по пути '/path/to/table', секционированную целочисленным столбцом с именем 'part' partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
from delta.tables import *
from pyspark.sql.functions import * # Доступ к таблице Delta Lake deltaTable = DeltaTable.forPath(spark, pathToEventsTable ) # Удаляем все рейсы пришедшие раньше или вовремя deltaTable.delete("delay < 0") # Сколько рейсов между Сиэтлом и Сан-Франциско spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show() После удаления (подробнее об этом ниже) всех рейсов пришедших вовремя или раньше, как вы можете видеть из предыдущего запроса, осталось 837 опоздавших рейсов из Сиэтла в Сан-Франциско. Если вы заглянете в файловую систему, вы заметите, что файлов стало больше, даже если вы удалили данные. /departureDelays.delta$ ls -l
_delta_log part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
# Обновляем все рейсы из Детройта, чтобы теперь они стали из Сиэтла
deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) # Сколько рейсов между Сиэтлом и Сан-Франциско spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show() Теперь, когда рейсы из Детройта теперь помечены как рейсы Сиэтла, у нас насчитывается 986 рейсов, вылетающих из Сиэтла в Сан-Франциско. Если выведете содержимое папки файловой системы departureDelays (например, $../departureDelays/ls -l), вы заметите, что теперь там 11 файлов (вместо 8 сразу после удаления файлов и четырех файлов после создания таблицы).Объединение наших летных данныхРаспространенный сценарий при работе с озером данных - это постоянное добавление данных в вашу таблицу. Это часто приводит к дублированию данных (строки, которые вы бы не хотели снова вставлять в таблицу), появлению новых строк, которые необходимо вставить, и строк, которые необходимо обновить. В Delta Lake все это можно решить с помощью операции merge (аналогично оператору SQL MERGE).Начнем с выборочного набора данных, который вы хотите обновить, вставить или дедуплицировать с помощью следующего запроса. # Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты
spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show() Затем давайте сгенерируем нашу собственную merge_table, содержащую данные, которые мы будем вставлять, обновлять или дедуплицировать с помощью следующего фрагмента кода. items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
cols = ['date', 'delay', 'distance', 'origin', 'destination'] merge_table = spark.createDataFrame(items, cols) merge_table.toPandas() В предыдущей таблице (merge_table) есть три строки, с уникальным значением даты:
# Объединяем merge_table с flights
deltaTable.alias("flights") \ .merge(merge_table.alias("updates"),"flights.date = updates.date") \ .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \ .whenNotMatchedInsertAll() \ .execute() # Какие рейсы между Сиэтлом (SEA) и Сан-Франциско (SFO) приходятся на эти даты spark.sql("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show() Просмотр истории таблицыКак уже отмечалось ранее, после каждой нашей транзакции (удаление, обновление) в файловой системе создавалось все больше файлов. Это связано с тем, что для каждой транзакции существуют разные версии таблицы Delta Lake. Это можно увидеть с помощью метода DeltaTable.history(), как показано ниже. deltaTable.history().show()
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+ |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+ | 2|2019-09-29 15:41:22| null| null| UPDATE|[predicate -> (or...|null| null| null| 1| null| false| | 1|2019-09-29 15:40:45| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false| | 0|2019-09-29 15:40:14| null| null| WRITE|[mode -> Overwrit...|null| null| null| null| null| false| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+ # Загружаем DataFrames для каждой версии
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta") dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta") dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta") # Рассчитываем количество полетов от Сиэтла (SEA) до Сан-Франциско (SFO) для каждой версии истории cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count() cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count() cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count() # Выводим значение print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2)) ## Вывод SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986 /departureDelays.delta$ ls -l
_delta_log part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet Вы можете выполнить ту же задачу с помощью SQL синтаксиса: ¸
# Удаляем все файлы старше 0 часов spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”) /departureDelays.delta$ ls -l
_delta_log part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet =========== Источник: habr.com =========== =========== Автор оригинала: Denny Lee and Tathagata Das ===========Похожие новости:
Блог компании OTUS. Онлайн-образование ), #_python, #_data_engineering |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 25-Ноя 19:17
Часовой пояс: UTC + 5