[Программирование, SQL, Data Engineering] Секреты производительности Spark, или Почему важна компиляция запросов (перевод)

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
24-Ноя-2020 20:31
Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.
Criteo — это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark — наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:
  • мы часто обновляем наши модели, повышая их производительность для наших клиентов;
  • мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;
  • от скорости обработки данных зависят затраты на инфраструктуру.
В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 2–10 раз, а это очень значимо.Конфигурация для экспериментовSpark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5 ГГцИзмерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.Заблуждения, связанные с обработкой больших данныхСуществует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:
  • перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;
  • дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.
Эти представления имеют под собой исторические основания — в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.В 2015 году в исследовании Кей Остерхаут (Kay Ousterhout) и др.¹ были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включая TPC-DS², и было определено, что:
  • если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2 % (медианное значение);
  • если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19 % (медианное значение).
Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:
  • Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.
  • При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.
Интересно, что специалисты Databricks примерно в 2016 году³ пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.Насколько быстро работает Spark?Давайте рассмотрим простую задачу — посчитаем наивным методом четные числа от 0 до 10⁹. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:
var res: Long = 0L
var i: Long  = 0L
while (i < 1000L * 1000 * 1000) {
  if (i % 2 == 0) res += 1
  i += 1L
}
Листинг 1. Наивный подсчетА теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:
val res = spark.sparkContext
  .range(0L, 1000L * 1000 * 1000)
  .filter(_ % 2 == 0)
  .count()
Листинг 2. Подсчет с помощью RDD
val res = spark.range(1000L * 1000 * 1000)
  .filter(col("id") % 2 === 0)
  .select(count(col("id")))
  .first().getAs[Long](0)
Листинг 3. Подсчет с помощью DatasetsВремя выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода. 
Парадокс DatasetsПарадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.Прошлое — модель VolcanoКод, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:
  • знает свой родительский RDD;
  • предоставляет посредством метода compute доступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).
abstract class RDD[T: ClassTag]
def compute(…): Iterator[T]
Листинг 4. RDD.scalaС учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:
def pseudo_rdd_count(rdd: RDD[T]): Long = {
  val iter = rdd.compute
  var result = 0
  while (iter.hasNext) result += 1
  result
}
Листинг 5. Псевдокод для действия подсчета на основе RDDПочему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:
  • Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором или JIT как встроенные (inline).
  • Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.
Настоящее — формирование кода всего этапаКод, написанный с помощью Spark SQL⁵, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation)⁶. Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino⁴. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.Эффективное использование SparkСегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark — в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива «продолжать в том же духе» весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.Неправильный подход — классический способСамая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:
val res = spark.range(1000L * 1000 * 1000)
    .rdd
    .filter(_ %2 == 0)
    .count()
Листинг 6. Переключение с Dataset на RDDЭтот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап. 
Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)Неправильный подход — изысканный способПроизводительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):
val res = spark
  .range(1000L * 1000 * 1000)
  .filter(x => x % 2 == 0) // note that the condition changed
  .select(count(col("id")))
  .first()
  .getAs[Long](0)
Листинг 7. Замена выражения Spark SQL функцией ScalaSpark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно — это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций — чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйте filter(condition: Column) вместо filter(T => Boolean) и select(…) вместо map(…). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.Заключение и итоговые замечанияПриведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.Правильное применение компиляции запросов может сократить время обработки в 2–10 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.Использованные материалы 4. https://janino-compiler.github.io/janino/5. http://people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf6. https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
Узнать подробнее о курсах "Data Engineer" и "Экосистема Hadoop, Spark, Hive".

===========
Источник:
habr.com
===========

===========
Автор оригинала: Victor Zaytsev
===========
Похожие новости: Теги для поиска: #_programmirovanie (Программирование), #_sql, #_data_engineering, #_spark, #_obrabotka_dannyh (обработка данных), #_spark_sql, #_spark_core_api, #_blog_kompanii_otus._onlajnobrazovanie (
Блог компании OTUS. Онлайн-образование
)
, #_programmirovanie (
Программирование
)
, #_sql, #_data_engineering
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 22:14
Часовой пояс: UTC + 5