[Data Mining, Big Data, R, Управление разработкой] Немного о параллельных вычислениях в R

Автор Сообщение
news_bot ®

Стаж: 6 лет 4 месяца
Сообщений: 27286

Создавать темы news_bot ® написал(а)
24-Фев-2021 17:32

Публикация очень краткая. Многие думают, что параллельные вычисления в R -- это очень сложно и неприменимо к их текущим задачам. И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта. Приведенные примеры сознательно похожи на продуктивные задачи, а не выхолощенные пара строчек синтетики.Является продолжением серии предыдущих публикаций.Используемые пакетыЗагрузка пакетов
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)
library(dqrng)
library(iterators)
library(future)
library(foreach)
library(doFuture)
library(tictoc)
library(futile.logger)
library(lgr) # будем использовать его рутовый логгер `lgr`
library(hrbrthemes)
Паттерны параллелизацииПаттерн 1. Параллелизация tidyverse вычисленийСитуация. Есть скрипт, содержащий множество пайплайнов на tidyverse.Пример задачи. Подсчитаем среднее от суммы квадратов чисел. Для повышения эффективности параллельных вычислений важно уменьшить объемы перекачки данных между потоками. Используем пакет furrr.`tidyverse` pipeline
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)
num_row <- 1:10^6
ff_seq <- function(x) x^2
ff_par <- function(x) mean(x^2)
tic("Считаем последовательно")
lst1 <- num_row %>%
  purrr::map_dbl(ff_seq) %>%
  mean()
toc()
tic("Считаем параллельно, вариант 1")
lst2 <- num_row %>%
  furrr::future_map_dbl(ff_seq) %>%
  mean()
toc()
tic("Считаем параллельно, вариант 2")
lst2 <- num_row %>%
  split(cut(seq_along(.), workers, labels = FALSE)) %>%
  furrr::future_map_dbl(ff_par) %>%
  mean()
toc()
Естественно, результат зависит от аппаратной платформы и ОС, на которой все запускается. На тестовом прогоне у меня такая раскладка:
Считаем последовательно: 7.23 sec elapsed
Считаем параллельно, вариант 1: 3.43 sec elapsed
Считаем параллельно, вариант 2: 0.64 sec elapsed
Windows и Linux достаточно сильно отличаются по методам параллелизации. Linux в продуктиве сильно предпочтительнее Windows.Паттерн 2. Локальная ручная параллелизацияСитация. В ряде случаев при работе скрипта необходимо выполнить незначительное число разовых неунифицируемых операций. Например, загрузка справочников и различных первичных данных. Есть возможное решение, функция%<-%.Генерация сэмплов
# создаем последовательность, матрица 20 атрибутов на 10^5 событий
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
  # stri_rand_strings(length = 10, pattern = "[a-z]") %>%
  sample(10^4:10^5, .) %>%
  sample(20 * nn, replace = TRUE) %>%
  matrix(byrow = TRUE, ncol = 20) %>%
  as_tibble(.name_repair = "universal") %>%
  mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
  # сгенерируем версию объекта
  mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
  select(user_id, ver, everything())
toc()
# сохраним в файл для последующей демонстрации параллелизации
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
Два варианта загрузки файлов
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future
# считаем, что воркеров у нас 2
tic("Объединяем последовательно обработанные файлы")
tic("Читаем файлы последовательно")
res_lst <- list()
for (j in 1:6) {
  res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()
tic("Объединяем параллельно обработанные файлы")
tic("Читаем файлы параллельно")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()
all_equal(seq_df, par_df)
Некая разница наблюдается. Пример исключительно для демонстрации принципа. На тестовом прогоне у меня такая раскладка:
Объединяем последовательно обработанные файлы: 46.23 sec elapsed
Объединяем параллельно обработанные файлы: 37.82 sec elapsed
Паттерн 3. Параллелизация сложного процессингаСитуация. Много вычислительного данных, много кода, процессинг потенциально независим.Пример.
Сделаем общее задание. Будем считать число сочетаний $C_n^k$. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.Генерация списка заданий.Подготовка логгеров
# подготовка логгеров
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")
initLogging <- function(log_file){
  lgr <- get_logger_glue("logger")
  lgr$set_propagate(FALSE)
  lgr$set_threshold("all")
  lgr$set_appenders(list(
    console = AppenderConsole$new(
      threshold = "info"
    ),
    file = AppenderFile$new(
      file = log_file,
      threshold = "all"
    )
  ))
  lgr
}
invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
Многопоточные расчеты
"Start batch processing" %T>%
  flog.info() %T>%
  lgr$info()
# инициализируем параллельную обработку
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)
tic("Batch processing")
start_time <- Sys.time()
foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"),
        # .packages = 'futile.logger',
        .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
          start <- Sys.time() - start_time
          # инициализируем логгер в потоке
          flog.appender(appender.tee(flog_logname))
          lgr <- initLogging(lgr_logname)
          res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)
          # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
          message("     message from thread")
          glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
               "PID: {Sys.getpid()}",
               "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
               .sep = " ") %T>%
            flog.info() %T>%
            lgr$info()
          # вернем тайминги тоже
          return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
        } -> output_lst
flog.info("Foreach finished")
checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)
# терминируем параллельную обработку --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
Для иллюстрации процесса нарисуем график запуска (точка) и завершения (крестик) задач на вычислителях. Хорошо видны первичные затраты на старт потоков windows.
Код построения графика.
# посмотрим графически на порядок запуска вычислителей
output_tbl %>%
  mutate_at("pid", as.factor) %>%
  mutate_at(vars(start, finish), as.numeric) %>%
  ggplot(aes(start, pid, colour = pid)) +
  geom_point(size = 3, alpha = .7) +
  geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
  geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
  ggthemes::scale_fill_tableau("Tableau 10") +
  theme_ipsum_rc() +
  xlim(c(0, 5))
ЗаключениеПри параллелизации задач, для достижения максимальной эффективности вычислений следует учитывать ряд важных моментов, вытекающих из принципов работы компьютеров, ОС и теоретических пределов. Если не погружаться глубоко в детали, резюмируем в виде «проверочных пунктов»:
  • Инициализация вычислителей (worker) является достаточно дорогостоящей. Требуется породить новое окружение (поток, кластер, …), его инициализировать. Для коротких вычислений (секунды) затраты на инициализацию могут оказаться существенно выше однопоточного вычисления.
  • При выделении потоков на одной машине, рекомендуется отдавать под вычислители core - 1, или чуть меньше. Один поток выполняет роль мастера, раздающего задания и выполняющего reduce ответов, получаемых от вычислителей. Ну и самой операционке тоже могут быть нужны ресурсы.
  • Дескрипторы файлов и коннектов к БД не переходят границы потоков.
  • Накладные расходы на перегон больших объемов данных из мастер потока в вычислитель и обратно могут оказаться по времени существенно выше, чем время вычисления. Оптимально, если мастер поток дает метаинформацию по заданию, а вычислитель уже сам загружает эти данные (из БД, из файлов, из API и т.д.). Ну и результат наверх должен уходить максимально агрегированный.
  • Состав заданий надо максимально готовить в мастер потоке, а вниз спускать грубую вычислительную подзадачу. Повышается прозрачность и воспроизводимость.
  • Для ряда задач, связанных с длинными синхронными запросами внешних системы (типичные представители -- REST API/Web scrapping), можно создавать вычислителей много больше чем доступных ядер. Они все равно висят большую часть времени в режиме ожидания. Можно запускать в виде отдельных процессов ОС с помощью настройки соответствующего бэкенда registerDoFuture(); plan(future.callr::callr). Это оставшаяся 1/2 рецепта.
Предыдущая публикация -- «Нюансы эксплуатации R решений в enterprise окружении?».
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_data_mining, #_big_data, #_r, #_upravlenie_razrabotkoj (Управление разработкой), #_data_science, #_enterprise, #_big_data, #_data_mining, #_big_data, #_r, #_upravlenie_razrabotkoj (
Управление разработкой
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 01-Июл 09:29
Часовой пояс: UTC + 5