[Apache, Big Data, Hadoop, Data Engineering] Как Apache Spark 3.0 увеличивает производительность ваших SQL рабочих нагрузок (перевод)

Автор Сообщение
news_bot ®

Стаж: 6 лет 3 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
01-Июн-2021 14:34


Практически в каждом секторе, работающем со сложными данными, Spark "де-факто" быстро стал средой распределенных вычислений для команд на всех этапах жизненного цикла данных и аналитики. Одна из наиболее ожидаемых функций Spark 3.0 - это новая платформа Adaptive Query Execution (AQE), устраняющая проблемы, которые возникают при многих рабочих нагрузках Spark SQL. Они были задокументированы в начале 2018 года командой специалистов Intel и Baidu. Для более глубокого изучения фреймворка можно пройти наш обновленный курс по тюнингу производительности Apache Spark (Apache Spark Performance Tuning).Наш опыт работы с Workload XM, безусловно, подтверждает реальность и серьезность этих проблем.AQE был впервые представлен в Spark 2.4, но в Spark 3.0 и 3.1 он стал намного более развитым. Для начала, давайте посмотрим, какие проблемы решает AQE.Недостаток первоначальной архитектуры Catalyst На диаграмме ниже представлен вид распределенной обработки, которая происходит, когда вы выполняете простой group-by-count запрос с использованием DataFrames.
Spark определяет подходящее количество партиций для первого этапа, но для второго этапа использует по умолчанию "магическое число" - 200.И это плохо по трем причинам: 1. 200 вряд ли будет идеальным количеством партиций, а именно их количество является одним из критических факторов, влияющих на производительность;2. Если вы запишете результат этого второго этапа на диск, у вас может получиться 200 маленьких файлов;3. Оптимизация и ее отсутствие имеют косвенный эффект: если обработка должна продолжаться после второго этапа, то вы можете упустить потенциальные возможности дополнительной оптимизации.Что можно сделать? Вручную установить значение этого свойства перед выполнением запроса с помощью такого оператора:spark.conf.set(“spark.sql.shuffle.partitions”,”2″)Но это также создает некоторые проблемы:
  • Задавать данный параметр перед каждым запросом утомительно.
  • Эти значения станут устаревшими по мере эволюции ваших данных.
  • Этот параметр будет применяться ко всем шаффлингах в вашем запросе.
Перед первым этапом в предыдущем примере известно распределение и объем данных, и Spark может предложить разумное значение для количества партиций. Однако для второго этапа эта информация еще не известна, так как цена, которую нужно заплатить за ее получение, - это выполнение фактической обработки на первом этапе: отсюда и обращение к магическому числу.Принцип работы Adaptive Query ExecutionОсновная идея AQE состоит в том, чтобы сделать план выполнения не окончательным и перепроверять статус после каждого этапа. Таким образом, план выполнения разбивается на новые абстракции «этапов запроса», разделенных этапами.Catalyst теперь останавливается на границе каждого этапа, чтобы попытаться применить дополнительную оптимизацию с учетом информации, доступной на основе промежуточных данных.Поэтому AQE можно определить как слой поверх Spark Catalyst, который будет изменять план Spark "на лету".Есть недостатки? Некоторые есть, но они второстепенные:
  • Выполнение останавливается на границе каждого этапа, чтобы Spark проверил свой план, но это компенсируется увеличением производительности.
  • Пользовательский интерфейс Spark труднее читать, потому что Spark создает для данного приложения больше заданий, и эти задания не подхватывают группу заданий и описание, которое вы задали.
Адаптивное количество перемешиваемых партицийЭта функция AQE доступна, начиная с версии Spark 2.4.Чтобы включить ее, вам нужно установить для spark.sql.adaptive.enabled значение true, значение по умолчанию - false. Когда AQE включено, количество партиций в случайном порядке регулируется автоматически и больше не равно 200 по умолчанию или заданному вручную значению.Вот как выглядит выполнение первого запроса TPC-DS до и после включения AQE:
Динамическая конвертация Sort Merge Joins в Broadcast JoinsAQE преобразует соединения sort-merge в broadcast хэш-соединения, если статистика времени выполнения любой из сторон соединения меньше порога broadcast хэш-соединения.Вот как выглядят последние этапы выполнения второго запроса TPC-DS до и после включения AQE:
Динамическое объединение shuffle партицийЕсли количество разделов в случайном порядке больше, чем количество групп по ключам, то много циклов ЦП теряется из-за несбалансированного распределения ключей.Когда оба: ·         spark.sql.adaptive.enabled и·         spark.sql.adaptive.coalescePartitions.enabled установлены на true, Spark объединит смежные перемешанные разделы в соответствии с целевым размером, указанным в spark.sql.adaptive.advisoryPartitionSizeInBytes. Это делается, чтобы избежать слишком большого количества мелких задач.
Динамическая оптимизация обьединений с перекосомSkew (перекос) - это камень преткновения распределенной обработки. Это может задержать обработку буквально на несколько часов:
Без оптимизации время, необходимое для выполнения объединения, будет определяться самым большим разделом.
Оптимизация skew join, таким образом, разобъет раздел A0 на подразделы, используя значение, указанное park.sql.adaptive.advisoryPartitionSizeInBytes, и присоединит каждый из них к соответствующему разделу B0 таблицы B.
Следовательно, вам необходимо предоставить AQE свое определение перекоса.Это включает в себя два параметра:1.   spark.sql.adaptive.skewJoin.skewedPartitionFactor является относительным: партиция считается с пересом, если ее размер больше, чем этот коэффициент, умноженный на средний размер партиции, а также, если он больше, чем2.   spark.sql.adaptive.skewedPartitionThresholdInBytes, который является абсолютным: это порог, ниже которого перекос будет игнорироваться.Динамическое сокращение разделовИдея динамического сокращения разделов (dynamic partition pruning, DPP) - один из наиболее эффективных методов оптимизации: считываются только те данные, которые вам нужны. Если в вашем запросе есть DPP, то AQE не запускается. DPP было перенесено в Spark 2.4 для CDP.Эта оптимизация реализована как на логическом, так и на физическом уровне.1.   На логическом уровне фильтр размера идентифицируется и распространяется через обьединение на другую часть сканирования.2.   Затем на физическом уровне фильтр выполняется один раз в части измерения, и результат транслируется в основную таблицу, где также применяется фильтр.
DPP в действительности может работать с другими типами обьединений (например, SortMergeJoin), если вы отключите spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly.В этом случае Spark оценит, действительно ли фильтр DPP улучшает производительность запроса.DPP может привести к значительному увеличению производительности высокоселективных запросов, например, если ваш запрос фильтрует данные за один месяц из выборки за 5 лет.
Не все запросы получают такой впечатляющий прирост производительности, но 72 из 99 запросов TPC-DS положительно влияют на DPP.Заключение  Spark прошел долгий путь от своей первоначальной базовой парадигмы: неспешного выполнения оптимизированного статического плана для статического набора данных.Анализ статического набора данных был пересмотрен из-за потоковой передачи: команда Spark сначала создала довольно неуклюжий дизайн на основе RDD, прежде чем придумать лучшее решение с использованием DataFrames.Часть статического плана подверглась сомнению со стороны SQL, а структура адаптивного выполнения запросов в некотором роде - это то, чем является структурированная потоковая передача для исходной потоковой библиотеки: элегантное решение, которым она должна была быть с самого начала.Благодаря фреймворку AQE, DPP, усиленной поддержке графических процессоров и Kubernetes перспективы увеличения производительности теперь весьма многообещающие, поэтому мы и наблюдаем повсеместный переход на Spark 3.1
===========
Источник:
habr.com
===========

===========
Автор оригинала: Francois Reynald
===========
Похожие новости: Теги для поиска: #_apache, #_big_data, #_hadoop, #_data_engineering, #_apache_spark, #_performance_optimization, #_spark3, #_data_inzhener (дата инженер), #_cloudera, #_hortonworks, #_blog_kompanii_cloudera (
Блог компании Cloudera
)
, #_apache, #_big_data, #_hadoop, #_data_engineering
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 16-Май 01:19
Часовой пояс: UTC + 5