[Big Data, Хранение данных, Хранилища данных, Data Engineering] Что нам стоит… загрузить JSON в Data Platform
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Всем привет! В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в «желудок» нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных. То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации «правой части» схемы — той, что лежит после Apache NiFi.
Схема из прошлой нашей статьи.Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они «уникальны» и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же «зоопарк разнообразия» присутствует. Поэтому без какого-либо Framework’а, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись. Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами framework’а приведена ниже:
Общая схема доставки данных в ODS-слой Greenplum
- Данные из систем-источников пишутся в Kafka в AVRO-формате, обрабатываются в режиме реального времени Apache NiFi, который сохраняет их в формате parquet на S3.
- Затем эти файлы с «сырыми» данными с помощью Spark’а обрабатываются в два этапа:
- Compaction – на данном этапе выполняется объединение для снижения количества выходных файлов с целью оптимизации записи и последующего чтения (то есть несколько более мелких файлов объединяются в несколько файлов «побольше»), а также производится дедубликация данных: простой distinct() и затем coalesce(). Результат сохраняется на S3. Эти файлы используются затем для parsing'а , а также являются своеобразным архивом сырых необработанных данных в формате «как есть»;
- Parsing – на этой фазе производится разбор входных данных и сохранение их в плоские структуры согласно маппингу, описанному в метаданных. В общем случае из одного входного файла можно получить на выходе несколько плоских структур, которые в виде сжатых (как правило gzip) CSV-файлов сохраняются на S3.
- Заключительный этап – загрузка данных CSV-файлов в ODS-слой хранилища: создается временная external table над данными в S3 через PXF S3 connector, после чего данные уже простым pgsql переливаются в таблицы ODS-слоя Greenplum
- Все это оркестрируется с помощью Airflow.
DAG’и для Airflow у нас генерируются динамически на основании метаданных. Parsing файлов и разложение их в плоские структуры также производится с использованием метаданных. Это приводит к упрощению интеграции нового источника, так как, для этого необходимо всего лишь:
- создать в ODS-слое Хранилища таблицы-приемники данных;
- в репозитории метаданных в Git согласно принятым стандартам прописать в виде отдельных YAML-файлов:
- общие настройки источника (такие как: расписание загрузки, входной и выходной формат файлов с данными, S3-бакет, имя сервисного пользователя, имя и email владельца для нотификации в случае сбоя загрузки и т.п.);
- маппинг данных объектов источника на таблицы слоя ODS (при этом поддерживаются как плоские, так и вложенные структуры и массивы, а также есть возможность данные из одного объекта раскладывать в ODS-слое по нескольким таблицам). То есть описать, как необходимо сложную вложенную структуру разложить в плоские таблицы;
До недавнего времени такой подход удовлетворял текущие наши потребности, но количество и разнообразие источников данных растет. У нас стали появляться источники, которые не являются реляционными базами данных, а генерируют данные в виде потока JSON-объектов. Кроме того на горизонте уже маячила интеграция источника, который под собой имел MongoDB и поэтому будет использовать MongoDB Kafka source connector для записи данных в Kafka. Поэтому остро встала необходимость доработки нашего framework’а для поддержки такого сценария. Хотелось, чтобы данные источника сразу попадали на S3 в формате JSON - то есть в формате "как есть", без лишнего шага конвертации в parquet посредством Apache NiFi. В первую очередь необходимо было доработать шаг Compaction. Его код, если убрать всю «обвязку» и выделить только главное, очень простой:
df = spark.read.format(in_format) \
.options(**in_options) \
.load(path) \
.distinct()
new_df = df.coalesce(div)
new_df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
Но если мы проделаем все те же манипуляции с JSON-данными, то волей-неволей внесем изменения во входные данные, так как при чтении JSON’ов Spark автоматом определит и сделает mergeSchema, т.е. мы тем самым можем исказить входные данные, чего не хотелось бы. Ведь наша задача на этом шаге – только укрупнить файлы и дедублицировать данные, без какого-либо вмешательства в их структуру и наполнение. То есть сохранить их «как есть». По-хорошему, нам надо было просто прочитать данные как обычный текст, дедублицировать строки, укрупнить файлы и положить обратно на S3. Для этого был предложен достаточно изящный способ:
рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.
Попробуем сделать это. Допустим, мы имеем следующий файл данных:file1:
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
Обратите внимание на формат этого файла. Это файл с JSON-объектами, где 1 строка = 1 объект. Оставаясь, по сути, JSON-ом, он при этом не пройдет синтаксическую JSON-валидацию. Именно в таком виде мы сохраняем JSON-данные на S3 (есть специальная "галочка в процессоре Apache NiFi).Прочитаем файл предлагаемым способом:
# Читаем данные
df = spark.read \
.format("csv") \
.option("sep", "\a") \
.load("file1.json")
# Схема получившегося DataFrame
df.printSchema()
root
|-- _c0: string (nullable = true)
# Сами данные
df.show()
+--------------------+
| _c0|
+--------------------+
|{"productId": 1, ...|
|{"productId": 2, ...|
+--------------------+
То есть мы тут читаем JSON как обычный CSV, указывая разделитель, который никогда заведомо не встретится в наших данных. Например, Bell character. В итоге мы получим DataFrame из одного поля, к которому можно будет также применить dicstinct() и затем coalesce(), то есть менять существующий код не потребуется. Нам остается только определить опции в зависимости от формата:
# Для parquet
in_format = "parquet"
in_options = {}
# Для JSON
in_format = "csv"
in_options = {"sep": "\a"}
Ну и при сохранении этого же DataFrame обратно на S3 в зависимости от формата данных опять применяем разные опции:
df.write.mode("overwrite") \
.format(out_format) \
.options(**out_options) \
.save(path)
# для JSON
out_format = "text"
out_options = {"compression": "gzip"}
# для parquet
out_format = input_format
out_options = {"compression": "snappy"}
Следующей точкой доработки был шаг Parsing. В принципе, ничего сложного, если бы задача при этом упиралась в одну маленькую деталь: JSON -файл, в отличии от parquet, не содержит в себе схему данных. Для разовой загрузки это не является проблемой, так как при чтении JSON-файла Spark умеет сам определять схему, и даже в случае, если файл содержит несколько JSON-объектов с немного отличающимся набором полей, корректно выполнит mergeSchema. Но для регулярного процесса мы не могли уповать на это. Банально может случиться так, что во всех записях какого-то файла с данными может не оказаться некоего поля «field_1», так как, например, в источнике оно заполняется не во всех случаях. Тогда в получившемся Spark DataFrame вообще не окажется этого поля, и наш Parsing, построенный на метаданных, просто-напросто упадет с ошибкой из-за того, что не найдет прописанное в маппинге поле. Проиллюстрирую. Допустим,у нас есть два файла из одного источника со следующим наполнением:file1 (тот же что и в примере выше):
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
file2:
{«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
Теперь прочитаем Spark’ом их и посмотрим данные и схемы получившихся DataFrame:
df = spark.read \
.format("json") \
.option("multiline", "false") \
.load(path)
df.printSchema()
df.show()
Первый файл (схема и данные):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- width: long (nullable = true)
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
|-- tags: array (nullable = true)
| |-- element: string (containsNull = true)
+--------------+-----+---------+-------------+--------------+
| dimensions|price|productId| productName| tags|
+--------------+-----+---------+-------------+--------------+
|[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]|
|[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]|
+--------------+-----+---------+-------------+--------------+
Второй файл (схема и данные):
root
|-- dimensions: struct (nullable = true)
| |-- height: double (nullable = true)
| |-- length: long (nullable = true)
| |-- package: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- width: long (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
+--------------------+---------+-------------+
| dimensions|productId| productName|
+--------------------+---------+-------------+
|[12.5, 10, [10.0,...| 3|ProductName 3|
+--------------------+---------+-------------+
Как видно, Spark корректно выстроил схему отдельно для каждого файла. Если в какой-либо записи не было обнаружено поля, имеющегося в другой, то в DataFrame мы видим корректное проставление null (поля price и productName для первого файла). Но в целом схемы получились разные, и если у нас в маппинге прописано, что нам нужно распарсить эти данные (то есть оба файла) в следующую плоскую структуру,
root
|-- price: double (nullable = true)
|-- productId: long (nullable = true)
|-- productName: string (nullable = true)
а во входных данных у нас присутствуют только файлы «а-ля file2», где поля «price» нет ни у одной записи, то Spark упадет с ошибкой, так как не найдет поля «price» для формирования выходного DataFrame. С parquet-файлами такой проблемы как правило не возникает, так как сам parquet-файл генерируется из AVRO, который уже содержит полную схему данных и, соответственно, эта полная схема есть и в parquet-файле.Еще надо отметить, что, естественно, мы хотели по максимум переиспользовать уже существующий и зарекомендовавший себя код нашего framework’а, а не городить какую-то полностью отдельную ветку для обработки JSON’ов – то есть все изменения хотелось сделать на этапе чтения JSON-файлов с S3.Таким образом очевидно, что для корректной загрузки данных из JSON-файлов необходимо предопределить схему JSON-файла с данными и читать файлы уже с применением этой схемы. Тогда у нас даже если в JSON’е нет какого-то поля, то в самом DataFrame это поле будет, но его значение подставится как null:
df = spark.read \
.format("json") \
.option("multiline","false") \
.schema(df_schema) \
.load(path)
Первая мысль была использовать для хранения схемы имеющийся сервис метаданных - то есть описать схему в YAML-формате и сохранить в имеющемся репозитории. Но с учетом того, что все данные источников у нас проходят через Kafka, решили, что логично для хранения схем использовать имеющийся Kafka Schema Registry, а схему хранить в стандартном для JSON формате (другой формат, кстати говоря, Kafka Schema Registry не позволил бы хранить). В общем, вырисовывалась следующая реализация:
- Читаем из Kafka Schema Registry схему
- Импортируем ее в pyspark.sql.types.StructType – что-то типа такого:
# 1. получаем через Kafka Schema Registry REST API схему данных
# 2. записываем ее в переменную schema и далее:
df_schema = StructType.fromJson(schema)
- Ну и с помощью полученной схемы читаем JSON-файлы
Звучит хорошо, если бы… Давайте посмотрим на формат JSON-схемы, понятной Spark’у. Пусть имеем простой JSON из file2 выше. Посмотреть его схему в формате JSON можно, выполнив:
df.schema.json()
Получившаяся схема
{
"fields":
[
{
"metadata": {},
"name": "dimensions",
"nullable": true,
"type":
{
"fields":
[
{"metadata":{},"name":"height","nullable":true,"type":"double"},
{"metadata":{},"name":"length","nullable":true,"type":"long"},
{"metadata":{},"name":"width","nullable":true,"type":"long"}
],
"type": "struct"
}
},
{
"metadata": {},
"name": "price",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "productId",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "productName",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "tags",
"nullable": true,
"type":
{
"containsNull": true,
"elementType": "string",
"type": "array"
}
}
],
"type": "struct"
}
Как видно, это совсем не стандартный формат JSON-схемы. «Но мы же наверняка не первые, у кого встала задача конвертировать стандартную JSON-схему в формат, понятный Spark’у» - подумали мы и … В принципе, не ошиблись, но несколько часов поиска упорно выводили исключительно на примеры, из серии:
как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно
либо на репозиторий https://github.com/zalando-incubator/spark-json-schema, который нам бы подошел, если мы использовали Scala, а не pySpark … В общем, на горизонте маячила перспектива писать SchemaConverter. Сперва решили отделаться малой кровью и написать простенький конвертер – никаких ссылок и прочих сложных конструкций. Конвертер был успешно протестирован на синтетических данных, после чего захотелось «скормить» ему что-то приближенное к реальности. К счастью, у нас уже был один источник, генерирующий данные в формате JSON. Как временное решение схема его интеграции в DataPlatform была незамысловата: NiFi читал данные из Kafka, преобразовывал их в parquet, использую «прибитую гвоздями» в NiFi схему в формате AVRO-schema, и складывал на S3. Схема данных была действительно непростой и с кучей вложенных структур и нескольких десятков полей - неплохой тест-кейс в общем-то:Посмотреть длинную портянку, если кому интересно :)
root
|-- taskId: string (nullable = true)
|-- extOrderId: string (nullable = true)
|-- taskStatus: string (nullable = true)
|-- taskControlStatus: string (nullable = true)
|-- documentVersion: long (nullable = true)
|-- buId: long (nullable = true)
|-- storeId: long (nullable = true)
|-- priority: string (nullable = true)
|-- created: struct (nullable = true)
| |-- createdBy: string (nullable = true)
| |-- created: string (nullable = true)
|-- lastUpdateInformation: struct (nullable = true)
| |-- updatedBy: string (nullable = true)
| |-- updated: string (nullable = true)
|-- customerId: string (nullable = true)
|-- employeeId: string (nullable = true)
|-- pointOfGiveAway: struct (nullable = true)
| |-- selected: string (nullable = true)
| |-- available: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- dateOfGiveAway: string (nullable = true)
|-- dateOfGiveAwayEnd: string (nullable = true)
|-- pickingDeadline: string (nullable = true)
|-- storageLocation: string (nullable = true)
|-- currentStorageLocations: array (nullable = true)
| |-- element: string (containsNull = true)
|-- customerType: string (nullable = true)
|-- comment: string (nullable = true)
|-- totalAmount: double (nullable = true)
|-- currency: string (nullable = true)
|-- stockDecrease: boolean (nullable = true)
|-- offline: boolean (nullable = true)
|-- trackId: string (nullable = true)
|-- transportationType: string (nullable = true)
|-- stockRebook: boolean (nullable = true)
|-- notificationStatus: string (nullable = true)
|-- lines: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lineId: string (nullable = true)
| | |-- extOrderLineId: string (nullable = true)
| | |-- productId: string (nullable = true)
| | |-- lineStatus: string (nullable = true)
| | |-- lineControlStatus: string (nullable = true)
| | |-- orderedQuantity: double (nullable = true)
| | |-- confirmedQuantity: double (nullable = true)
| | |-- assignedQuantity: double (nullable = true)
| | |-- pickedQuantity: double (nullable = true)
| | |-- controlledQuantity: double (nullable = true)
| | |-- allowedForGiveAwayQuantity: double (nullable = true)
| | |-- givenAwayQuantity: double (nullable = true)
| | |-- returnedQuantity: double (nullable = true)
| | |-- sellingScheme: string (nullable = true)
| | |-- stockSource: string (nullable = true)
| | |-- productPrice: double (nullable = true)
| | |-- lineAmount: double (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- markingFlag: string (nullable = true)
| | |-- operations: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- operationId: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | | | |-- reason: string (nullable = true)
| | | | |-- quantity: double (nullable = true)
| | | | |-- dmCodes: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- timeStamp: string (nullable = true)
| | | | |-- updatedBy: string (nullable = true)
| | |-- source: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- type: string (nullable = true)
| | | | |-- items: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- assignedQuantity: double (nullable = true)
|-- linkedObjects: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- objectType: string (nullable = true)
| | |-- objectId: string (nullable = true)
| | |-- objectStatus: string (nullable = true)
| | |-- objectLines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- objectLineId: string (nullable = true)
| | | | |-- taskLineId: string (nullable = true)
Естественно, я не захотел перебивать руками захардкоженную схему, а воспользовался одним из многочисленных онлайн-конвертеров, позволяющих из Avro-схемы сделать JSON-схему. И тут меня ждал неприятный сюрприз: все перепробованные мною конвертеры на выходе использовали гораздо больше синтаксических конструкций, чем «понимала» первая версия конвертера. Дополнительно пришло осознание, что также как и я, наши пользователи (а для нас пользователями в данном контексте являются владельцы источников данных) с большой вероятностью могут использовать подобные конвертеры для того, чтобы получить JSON-схему, которую надо зарегистрировать в Kafka Schema Registry, из «того, что у них есть». В результате наш SparkJsonSchemaConverter был доработан – появилась поддержка более сложных конструкций, таких как definitions, refs (только внутренние) и oneOf. Сам же парсер был оформлен уже в отдельный класс, который сразу «собирал» на основании JSON-схемы объект pyspark.sql.types.StructType
У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!
В итоге благодаря написанному SparkJsonSchemaConverter’у доработка шага Parsing свелась только к небольшому «тюнингу» чтения данных с S3: в зависимости от формата входных данных источника (получаем из сервиса метаданных) читаем файлы с S3 немного по-разному:
# Для JSON
df = spark.read.format(in_format)\
.option("multiline", "false")\
.schema(json_schema) \
.load(path)
# Для parquet:
df = spark.read.format(in_format)\
.load(path)
А дальше отрабатывает уже существующий код, раскрывающий все вложенные структуры согласно маппингу и сохраняющий данные DataFrame’а в несколько плоских CSV-файлов. В итоге мы смогли при относительном минимуме внесенных изменений в код текущего framework’а добавить в него поддержку интеграции в нашу Data Platform JSON-источников данных. И результат нашей работы уже заметен:
- Всего через месяц после внедрения доработки у нас на ПРОДе проинтегрировано 4 новых JSON-источника!
- Все текущие интеграции даже «не заметили» расширения функционала framework’а, а значит, мы точно ничего не «сломали» произведенной доработкой.
===========
Источник:
habr.com
===========
Похожие новости:
- [Хостинг, Хранение данных, Облачные сервисы] К чёрту всё, я буду сам себе хостер (перевод)
- [Системное администрирование, Анализ и проектирование систем, Data Mining, DevOps, Data Engineering] Проблемы мониторинга дата-пайплайнов и как я их решал
- [Python, Алгоритмы, Big Data, R, Data Engineering] Запросить 100 серверов нельзя оптимизировать код. Ставим запятую
- [Big Data, Хранение данных] Вебинар про развитие цифровизации в финансовых учреждениях с помощью данных, аналитики и искусственного интеллекта
- [Java, Big Data, Конференции, Интервью] Паша Финкельштейн о Big Data, Apache Spark и DevRel
- [Big Data, Машинное обучение, Искусственный интеллект] ТОП-10 трендов в сфере данных и аналитики 2021. Версия Gartner (перевод)
- [.NET, Алгоритмы, C#, Big Data, Математика] MEX (Minimum EXcluded) Алгоритм поиска минимального отсутствующего числа
- [Python, Алгоритмы, Big Data, Машинное обучение, Искусственный интеллект] DataScience Digest — 10.06.21
- [Big Data, Искусственный интеллект, Финансы в IT, Data Engineering] Чтобы потолка не стало, а крышу не снесло: о чем новый подкаст ВТБ
- [Big Data, Изучение языков] «Симпсоны» — лучшее TV-шоу для изучения английских слов. Доказано Big Data
Теги для поиска: #_big_data, #_hranenie_dannyh (Хранение данных), #_hranilischa_dannyh (Хранилища данных), #_data_engineering, #_spark, #_json, #_data_platform, #_data_lake, #_pyspark, #_blog_kompanii_lerua_merlen (
Блог компании Леруа Мерлен
), #_big_data, #_hranenie_dannyh (
Хранение данных
), #_hranilischa_dannyh (
Хранилища данных
), #_data_engineering
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 12:36
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Всем привет! В недавней статье мы рассказали, как мы шли к построению нашей Data Platform. Сегодня хотелось бы глубже погрузиться в «желудок» нашей платформы и попутно рассказать вам о том, как мы решали одну из задач, которая возникла в связи с ростом разнообразия интегрируемых источников данных. То есть, если возвращаться к финальной картинке из упомянутой выше статьи (специально дублирую ее, чтобы уважаемым читателям было удобнее), то сегодня мы будем более углубленно говорить о реализации «правой части» схемы — той, что лежит после Apache NiFi. Схема из прошлой нашей статьи.Напомним, что в нашей компании более 350 реляционных баз данных. Естественно, не все они «уникальны» и многие представляют собой по сути разные экземпляры одной и той же системы, установленной во всех магазинах торговой сети, но все же «зоопарк разнообразия» присутствует. Поэтому без какого-либо Framework’а, упрощающего и ускоряющего интеграцию источников в Data Platform, не обойтись. Общая схема доставки данных из источников в ODS-слой Greenplum посредством разработанного нами framework’а приведена ниже: Общая схема доставки данных в ODS-слой Greenplum
df = spark.read.format(in_format) \
.options(**in_options) \ .load(path) \ .distinct() new_df = df.coalesce(div) new_df.write.mode("overwrite") \ .format(out_format) \ .options(**out_options) \ .save(path) рассматривать файлы с JSON-объектами как DataFrame с одной колонкой, содержащей весь JSON-объект.
{«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}} # Читаем данные
df = spark.read \ .format("csv") \ .option("sep", "\a") \ .load("file1.json") # Схема получившегося DataFrame df.printSchema() root |-- _c0: string (nullable = true) # Сами данные df.show() +--------------------+ | _c0| +--------------------+ |{"productId": 1, ...| |{"productId": 2, ...| +--------------------+ # Для parquet
in_format = "parquet" in_options = {} # Для JSON in_format = "csv" in_options = {"sep": "\a"} df.write.mode("overwrite") \
.format(out_format) \ .options(**out_options) \ .save(path) # для JSON out_format = "text" out_options = {"compression": "gzip"} # для parquet out_format = input_format out_options = {"compression": "snappy"} {«productId»: 1, «productName»: «ProductName 1», «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}}
{«productId»: 2, «price»: 10.01, «tags»: [«tag 1», «tag 2»], «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5}} {«productId»: 3, «productName»: «ProductName 3», «dimensions»: {«length»: 10, «width»: 12, «height»: 12.5, «package»: [10, 20.5, 30]}}
df = spark.read \
.format("json") \ .option("multiline", "false") \ .load(path) df.printSchema() df.show() root
|-- dimensions: struct (nullable = true) | |-- height: double (nullable = true) | |-- length: long (nullable = true) | |-- width: long (nullable = true) |-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) |-- tags: array (nullable = true) | |-- element: string (containsNull = true) +--------------+-----+---------+-------------+--------------+ | dimensions|price|productId| productName| tags| +--------------+-----+---------+-------------+--------------+ |[12.5, 10, 12]| null| 1|ProductName 1|[tag 1, tag 2]| |[12.5, 10, 12]|10.01| 2| null|[tag 1, tag 2]| +--------------+-----+---------+-------------+--------------+ root
|-- dimensions: struct (nullable = true) | |-- height: double (nullable = true) | |-- length: long (nullable = true) | |-- package: array (nullable = true) | | |-- element: double (containsNull = true) | |-- width: long (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) +--------------------+---------+-------------+ | dimensions|productId| productName| +--------------------+---------+-------------+ |[12.5, 10, [10.0,...| 3|ProductName 3| +--------------------+---------+-------------+ root
|-- price: double (nullable = true) |-- productId: long (nullable = true) |-- productName: string (nullable = true) df = spark.read \
.format("json") \ .option("multiline","false") \ .schema(df_schema) \ .load(path)
# 1. получаем через Kafka Schema Registry REST API схему данных
# 2. записываем ее в переменную schema и далее: df_schema = StructType.fromJson(schema)
df.schema.json()
{
"fields": [ { "metadata": {}, "name": "dimensions", "nullable": true, "type": { "fields": [ {"metadata":{},"name":"height","nullable":true,"type":"double"}, {"metadata":{},"name":"length","nullable":true,"type":"long"}, {"metadata":{},"name":"width","nullable":true,"type":"long"} ], "type": "struct" } }, { "metadata": {}, "name": "price", "nullable": true, "type": "double" }, { "metadata": {}, "name": "productId", "nullable": true, "type": "long" }, { "metadata": {}, "name": "productName", "nullable": true, "type": "string" }, { "metadata": {}, "name": "tags", "nullable": true, "type": { "containsNull": true, "elementType": "string", "type": "array" } } ], "type": "struct" } как сохранить схему уже прочитанного DataFrame в JSON, затем использовать повторно
root
|-- taskId: string (nullable = true) |-- extOrderId: string (nullable = true) |-- taskStatus: string (nullable = true) |-- taskControlStatus: string (nullable = true) |-- documentVersion: long (nullable = true) |-- buId: long (nullable = true) |-- storeId: long (nullable = true) |-- priority: string (nullable = true) |-- created: struct (nullable = true) | |-- createdBy: string (nullable = true) | |-- created: string (nullable = true) |-- lastUpdateInformation: struct (nullable = true) | |-- updatedBy: string (nullable = true) | |-- updated: string (nullable = true) |-- customerId: string (nullable = true) |-- employeeId: string (nullable = true) |-- pointOfGiveAway: struct (nullable = true) | |-- selected: string (nullable = true) | |-- available: array (nullable = true) | | |-- element: string (containsNull = true) |-- dateOfGiveAway: string (nullable = true) |-- dateOfGiveAwayEnd: string (nullable = true) |-- pickingDeadline: string (nullable = true) |-- storageLocation: string (nullable = true) |-- currentStorageLocations: array (nullable = true) | |-- element: string (containsNull = true) |-- customerType: string (nullable = true) |-- comment: string (nullable = true) |-- totalAmount: double (nullable = true) |-- currency: string (nullable = true) |-- stockDecrease: boolean (nullable = true) |-- offline: boolean (nullable = true) |-- trackId: string (nullable = true) |-- transportationType: string (nullable = true) |-- stockRebook: boolean (nullable = true) |-- notificationStatus: string (nullable = true) |-- lines: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- lineId: string (nullable = true) | | |-- extOrderLineId: string (nullable = true) | | |-- productId: string (nullable = true) | | |-- lineStatus: string (nullable = true) | | |-- lineControlStatus: string (nullable = true) | | |-- orderedQuantity: double (nullable = true) | | |-- confirmedQuantity: double (nullable = true) | | |-- assignedQuantity: double (nullable = true) | | |-- pickedQuantity: double (nullable = true) | | |-- controlledQuantity: double (nullable = true) | | |-- allowedForGiveAwayQuantity: double (nullable = true) | | |-- givenAwayQuantity: double (nullable = true) | | |-- returnedQuantity: double (nullable = true) | | |-- sellingScheme: string (nullable = true) | | |-- stockSource: string (nullable = true) | | |-- productPrice: double (nullable = true) | | |-- lineAmount: double (nullable = true) | | |-- currency: string (nullable = true) | | |-- markingFlag: string (nullable = true) | | |-- operations: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- operationId: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- reason: string (nullable = true) | | | | |-- quantity: double (nullable = true) | | | | |-- dmCodes: array (nullable = true) | | | | | |-- element: string (containsNull = true) | | | | |-- timeStamp: string (nullable = true) | | | | |-- updatedBy: string (nullable = true) | | |-- source: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- type: string (nullable = true) | | | | |-- items: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- assignedQuantity: double (nullable = true) |-- linkedObjects: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- objectType: string (nullable = true) | | |-- objectId: string (nullable = true) | | |-- objectStatus: string (nullable = true) | | |-- objectLines: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- objectLineId: string (nullable = true) | | | | |-- taskLineId: string (nullable = true) У нас почти сразу же родилась мысль, что хорошо бы было поделиться им с сообществом, так как мы в Леруа Мерлен сами активно используем продукты Open Source, созданные и поддерживаемые энтузиастами со всего мира, и хотим не просто их использовать, но и контрибьютить обратно, участвуя в разработке Open Source продуктов и поддерживая сообщество. В настоящий момент мы решаем внутренние орг.вопросы по схеме выкладывания данного конвертера в Open Source и, уверен, что в ближайшее время поделимся с сообществом этой наработкой!
# Для JSON
df = spark.read.format(in_format)\ .option("multiline", "false")\ .schema(json_schema) \ .load(path) # Для parquet: df = spark.read.format(in_format)\ .load(path)
=========== Источник: habr.com =========== Похожие новости:
Блог компании Леруа Мерлен ), #_big_data, #_hranenie_dannyh ( Хранение данных ), #_hranilischa_dannyh ( Хранилища данных ), #_data_engineering |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 12:36
Часовой пояс: UTC + 5