[Программирование, SQL, Data Engineering] Секреты производительности Spark, или Почему важна компиляция запросов (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.
Criteo — это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark — наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:
- мы часто обновляем наши модели, повышая их производительность для наших клиентов;
- мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;
- от скорости обработки данных зависят затраты на инфраструктуру.
В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 2–10 раз, а это очень значимо.Конфигурация для экспериментовSpark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5 ГГцИзмерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.Заблуждения, связанные с обработкой больших данныхСуществует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:
- перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;
- дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.
Эти представления имеют под собой исторические основания — в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.В 2015 году в исследовании Кей Остерхаут (Kay Ousterhout) и др.¹ были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включая TPC-DS², и было определено, что:
- если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2 % (медианное значение);
- если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19 % (медианное значение).
Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:
- Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.
- При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.
Интересно, что специалисты Databricks примерно в 2016 году³ пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.Насколько быстро работает Spark?Давайте рассмотрим простую задачу — посчитаем наивным методом четные числа от 0 до 10⁹. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:
var res: Long = 0L
var i: Long = 0L
while (i < 1000L * 1000 * 1000) {
if (i % 2 == 0) res += 1
i += 1L
}
Листинг 1. Наивный подсчетА теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:
val res = spark.sparkContext
.range(0L, 1000L * 1000 * 1000)
.filter(_ % 2 == 0)
.count()
Листинг 2. Подсчет с помощью RDD
val res = spark.range(1000L * 1000 * 1000)
.filter(col("id") % 2 === 0)
.select(count(col("id")))
.first().getAs[Long](0)
Листинг 3. Подсчет с помощью DatasetsВремя выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода.
Парадокс DatasetsПарадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.Прошлое — модель VolcanoКод, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:
- знает свой родительский RDD;
- предоставляет посредством метода compute доступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).
abstract class RDD[T: ClassTag]
def compute(…): Iterator[T]
Листинг 4. RDD.scalaС учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:
def pseudo_rdd_count(rdd: RDD[T]): Long = {
val iter = rdd.compute
var result = 0
while (iter.hasNext) result += 1
result
}
Листинг 5. Псевдокод для действия подсчета на основе RDDПочему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:
- Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором или JIT как встроенные (inline).
- Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.
Настоящее — формирование кода всего этапаКод, написанный с помощью Spark SQL⁵, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation)⁶. Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino⁴. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.Эффективное использование SparkСегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark — в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива «продолжать в том же духе» весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.Неправильный подход — классический способСамая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:
val res = spark.range(1000L * 1000 * 1000)
.rdd
.filter(_ %2 == 0)
.count()
Листинг 6. Переключение с Dataset на RDDЭтот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап.
Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)Неправильный подход — изысканный способПроизводительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):
val res = spark
.range(1000L * 1000 * 1000)
.filter(x => x % 2 == 0) // note that the condition changed
.select(count(col("id")))
.first()
.getAs[Long](0)
Листинг 7. Замена выражения Spark SQL функцией ScalaSpark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно — это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций — чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйте filter(condition: Column) вместо filter(T => Boolean) и select(…) вместо map(…). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.Заключение и итоговые замечанияПриведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.Правильное применение компиляции запросов может сократить время обработки в 2–10 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.Использованные материалы
- Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks (Анализ производительности платформ анализа данных). 12-й симпозиум {USENIX} по проектированию и реализации сетевых систем ({NSDI} 15). 2015.
- http://www.tpc.org/tpcds/
- https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
4. https://janino-compiler.github.io/janino/5. http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf6. https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
===========
Источник:
habr.com
===========
===========
Автор оригинала: Victor Zaytsev
===========Похожие новости:
- [Разработка под iOS, Разработка мобильных приложений, Swift] SwiftUI 2.0: будущее декларативно (перевод)
- [Программирование, Разработка игр, C#, Unity] Базовые концепции Unity для программистов (перевод)
- [Программирование, Софт] Понятность ПО: самый важный показатель, который вы не отслеживаете (перевод)
- [MySQL, Go] Пишем CRUD-приложение на Go с помощью Mysql, GORM, Echo, Clean Architecture (перевод)
- [Программирование, Алгоритмы, DIY или Сделай сам] Определяем язык текста. Сложный случай
- [Программирование, Kubernetes] Руководство по настройке целевых уровней обслуживания (SLO) в Kubernetes с помощью Prometheus и Linkerd (перевод)
- [Программирование, Анализ и проектирование систем, Big Data] Применение микросервисной архитектуры в потоковой обработке Big Data
- [Программирование, Серверное администрирование, ООП, Машинное обучение] Как можно сэкономить на онлайн-курсах и сделать обучение эффективнее
- [Java, NoSQL, Big Data, Машинное обучение, Natural Language Processing] Кластеризация и классификация больших Текстовых данных с помощью машинного обучения на Java. Статья #2 — Алгоритмы
- [.NET, ASP, API, Тестирование веб-сервисов] Тестируем веб-API ASP.NET Core (перевод)
Теги для поиска: #_programmirovanie (Программирование), #_sql, #_data_engineering, #_spark, #_obrabotka_dannyh (обработка данных), #_spark_sql, #_spark_core_api, #_blog_kompanii_otus._onlajnobrazovanie (
Блог компании OTUS. Онлайн-образование
), #_programmirovanie (
Программирование
), #_sql, #_data_engineering
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 22:14
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.
var res: Long = 0L
var i: Long = 0L while (i < 1000L * 1000 * 1000) { if (i % 2 == 0) res += 1 i += 1L } val res = spark.sparkContext
.range(0L, 1000L * 1000 * 1000) .filter(_ % 2 == 0) .count() val res = spark.range(1000L * 1000 * 1000)
.filter(col("id") % 2 === 0) .select(count(col("id"))) .first().getAs[Long](0) Парадокс DatasetsПарадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.Прошлое — модель VolcanoКод, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:
abstract class RDD[T: ClassTag]
def compute(…): Iterator[T] def pseudo_rdd_count(rdd: RDD[T]): Long = {
val iter = rdd.compute var result = 0 while (iter.hasNext) result += 1 result }
val res = spark.range(1000L * 1000 * 1000)
.rdd .filter(_ %2 == 0) .count() Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)Неправильный подход — изысканный способПроизводительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд): val res = spark
.range(1000L * 1000 * 1000) .filter(x => x % 2 == 0) // note that the condition changed .select(count(col("id"))) .first() .getAs[Long](0)
=========== Источник: habr.com =========== =========== Автор оригинала: Victor Zaytsev ===========Похожие новости:
Блог компании OTUS. Онлайн-образование ), #_programmirovanie ( Программирование ), #_sql, #_data_engineering |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 22:14
Часовой пояс: UTC + 5