[Python, SQL, Big Data, SQLite, Data Engineering] PySpark. Решаем задачу на поиск сессий

Автор Сообщение
news_bot ®

Стаж: 6 лет 2 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
07-Мар-2021 12:31

Добрый день уважаемые читатели! Несколько дней назад перечитывая книгу Энтони Молинаро “SQL. Сборник рецептов”, в одной из глав я наткнулся на тему, которая была посвящена определению начала и конца диапазона последовательных значений. Бегло ознакомившись с материалом, я сразу вспомнил, что уже сталкивался с данным вопросом в качестве одного из тестовых заданий, но тогда тема была заявлена как “Задача на поиск сессий”. Фишкой технического собеседования был не разбор выполненной работы, а один из вопросов интервьюера о том, как получить аналогичные значения с помощью Spark. Готовясь к собеседованию, я не знал, что в компании применяется (а может и не применяется…) Apache Spark, и поэтому не собрал информацию по новому на тот момент для меня инструменту. Оставалось лишь выдвинуть гипотезу, что искомое решение может быть подобно скрипту, который можно написать c помощью библиотеки Pandas. Хотя очень отдалено я все-таки попал в цель, однако поработать в данной организации не получилось.Справедливости ради хочу заметить, что за прошедшие годы я несильно продвинулся в изучении Apache Spark. Но я все равно хочу поделиться с читателями наработками, так как многие аналитики вообще не сталкивались с этим инструментом, а другим возможно предстоит подобное собеседование. Если вы являетесь профессионалом Spark, то всегда можно предложить более оптимальный код в комментариях к публикации.Это была преамбула, приступим непосредственно к разбору данной темы. Пойдем сначала и напишем SQL скрипт. Но прежде создадим базу данных и заполним ее значениями. Так как это демо-пример предлагаю использовать SQLite. Данная БД уступает более мощным “коллегам по цеху”, но ее возможностей для разработки скрипта нам хватит сполна. Чтобы автоматизировать заявленные выше операции, я написал вот такой код на Python.
# Импорт библиотек
import sqlite3
# Данные для записи в БД
projects = [
    ('2020-01-01', '2020-01-02'),
    ('2020-01-02', '2020-01-03'),
    ('2020-01-03', '2020-01-04'),
    ('2020-01-04', '2020-01-05'),
    ('2020-01-06', '2020-01-07'),
    ('2020-01-16', '2020-01-17'),
    ('2020-01-17', '2020-01-18'),
    ('2020-01-18', '2020-01-19'),
    ('2020-01-19', '2020-01-20'),
    ('2020-01-21', '2020-01-22'),
    ('2020-01-26', '2020-01-27'),
    ('2020-01-27', '2020-01-28'),
    ('2020-01-28', '2020-01-29'),
    ('2020-01-29', '2020-01-30')
]
try:
    # Создаем соединение
    con = sqlite3.connect("projects.sqlite")
    # Создаем курсор
    cur = con.cursor()
    # Создаем таблицу
    cur.execute("""CREATE TABLE IF NOT EXISTS projects (
                    proj_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    proj_start TEXT,
                    proj_end TEXT)""")
    # Добавляем записи
    cur.executemany("INSERT INTO projects VALUES(NULL, ?,?)", projects)
    # Сохраняем транзакцию
    con.commit()
    # Закрываем курсор
    cur.close()
except sqlite3.Error as err:
    print("Ошибка выполнения запроса", err)
finally:
    # Закрываем соединение
    con.close()
    print("Соединение успешно закрыто")
Решение тривиальное и не требует дополнительных комментариев. Для коммуникации с созданной БД я использовал DBeaver. Клиентское приложение хорошо себя зарекомендовало, поэтому я часто использую его для разработки SQL запросов.
В рукописи Молинаро приводится два варианта решения данной задачи, но, по сути, это один и тот же код. Просто в первом случае не применяются оконные функции, а во-втором они присутствуют. Я выбрал последний, чтобы упростить ход рассуждений. Текстовую версию скрипта и все вспомогательные файлы к публикации вы можете найти по адресу (ссылка).
select
      p3.proj_group,
      min(p3.proj_start) as date_start,
      max(p3.proj_end) as date_end,
      julianday(max(p3.proj_end))-julianday( min(p3.proj_end))+1 as delta
from
    (select
       p2.*,
       sum(p2.flag)over(order by p2.proj_id) as proj_group
  from
    (select
          p.proj_id ,
          p.proj_start,
          p.proj_end,
          case
          when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1
          end as flag
    from projects as p) as p2) as p3
group by p3.proj_group
Если вы раньше уже использовали оконные функции, то разобраться самостоятельно с написанной конструкцией не составит никакого труда. Я лишь кратко опишу логику. Первоначальная таблица представляет собой последовательные шаги, для которых заданы два параметра: дата начала и дата конца. Если дата начала шага соответствует дате конца предыдущего шага, то два шага считаются одной сессией. Следовательно, начинать расчеты нужно со смещения, за это отвечает оконная функция lag. На следующем этапе сравниваем дату старта текущего шага и дату конца предыдущего и выводим либо 0, либо 1. Если к новому столбцу применить суммирование с нарастающим итогом, то получим номера сессий. Стандартная группировка по номерам с агрегирующими функциями позволит извлечь начало и конец диапазона значений. Я также рассчитал дельту между двумя датами на случай, если потребуется установить самую длинную или короткую сессию.  Приведенный код будет актуален и для других БД. Ошибка будет выводиться только на строчке, где находится разница между двумя датами (функция julianday это прерогатива SQLite). На этом первая часть тестового задания выполнена. Переходим к Spark.Если верить Википедии, то Apache Spark это фреймворк с открытым исходным кодом для реализации распределённой обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop. Так как я не пишу на Java, Scala или R, то для получения функциональности Spark решил использовать PySpark. Устанавливать на компьютер все необходимые для работы компоненты я не стал. Для экспериментов выбрал облачный сервис Google Colab, так как у меня уже был заведенный аккаунт. Основной минус - при каждом новом сеансе работы нужно заново скачивать файлы, связанные с запуском нашего инструмента. На просторах Интернета я встречал вариант с фиксированной установкой, но пока не пробовал его на практике.
С помощью базовых команд Linux мы устанавливаем OpenJDK, скачиваем и разархивируем файлы Spark. Затем прописываем две переменные среды. Нужно не забыть о вспомогательной библиотеке findspark. Подготовительная работа закончена, осталось только открыть сессию.В идеале следует импортировать файл с БД SQLite в облако и подключаться к нему, но я решил облегчить себе жизнь и сформировал датафрейм прямо в ноутбуке. Чтобы даты воспринимались как даты, потребовалось написать собственную функцию.
Так как операций в Spark довольно много, рекомендую сразу обзавестись шпаргалками. Если говорить о литературе для изучения данного инструмента, то радует два факта. Во-первых, есть как англоязычные, так и переводные издания, а во-вторых, источников информации предостаточно. Если вы не владеете языком Шекспира, то могу порекомендовать в первую очередь “Изучаем Spark. Молниеносный анализ данных”, авторы Холден Карау, Энди Конвински, Патрик Венделл, Матей Захария. После того, как датафрейм подготовлен, можно пойти сначала самым простым путем и использовать уже имеющийся скрипт SQL. В код нужно лишь внести две правки: изменить имя базовой таблицы, изменить способ нахождения дельты (функция datediff).
Строго говоря, мы уже выполнили вторую часть тестового задания. Но, что-то мне подсказывает, что на собеседовании к данному способу могут придраться, аргументируя это тем, что это все тот же скрипт SQL без специфики Spark. Поэтому в конце статьи приведу псевдокод, который скорее всего уступает по производительности предыдущему решению, но лучше отражает способности соискателя программировать. Полную версию скрипта можно найти в ноутбуке.
from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
#...
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
#...
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
#...
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
Краткие выводы.
  • Перед собеседованием внимательно изучайте список инструментов и технологий, с которыми работает компания. Не стоит ограничиваться позициями только из текста вакансии. В идеале нужно постоянно расширять свой профессиональный кругозор, чтобы отвечать на “каверзные” вопросы, которые задаются с цель всесторонне прощупать кандидата.
  • Даже если вы раньше никогда не работали со Spark, это не повод отказываться от конкурса на вакантную позицию. Основы PySpark можно освоить в сжатые сроки, при условии, что в бэкграунде уже есть опыт программирования с использованием библиотеки Pandas.
  • Недостатка в книгах по Spark не наблюдается.
На этом все. Всем здоровья, удачи и профессиональных успехов!
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_python, #_sql, #_big_data, #_sqlite, #_data_engineering, #_sql, #_sqlite, #_python, #_spark, #_python, #_sql, #_big_data, #_sqlite, #_data_engineering
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 27-Апр 22:05
Часовой пояс: UTC + 5