[Data Mining, Big Data, R, Управление разработкой] Немного о параллельных вычислениях в R
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Публикация очень краткая. Многие думают, что параллельные вычисления в R -- это очень сложно и неприменимо к их текущим задачам. И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта. Приведенные примеры сознательно похожи на продуктивные задачи, а не выхолощенные пара строчек синтетики.Является продолжением серии предыдущих публикаций.Используемые пакетыЗагрузка пакетов
library(tidyverse)
library(magrittr)
library(stringi)
library(glue)
library(dqrng)
library(iterators)
library(future)
library(foreach)
library(doFuture)
library(tictoc)
library(futile.logger)
library(lgr) # будем использовать его рутовый логгер `lgr`
library(hrbrthemes)
Паттерны параллелизацииПаттерн 1. Параллелизация tidyverse вычисленийСитуация. Есть скрипт, содержащий множество пайплайнов на tidyverse.Пример задачи. Подсчитаем среднее от суммы квадратов чисел. Для повышения эффективности параллельных вычислений важно уменьшить объемы перекачки данных между потоками. Используем пакет furrr.`tidyverse` pipeline
registerDoFuture()
# future::plan(multiprocess)
workers <- parallel::detectCores() - 1
future::plan(multisession, workers = workers)
num_row <- 1:10^6
ff_seq <- function(x) x^2
ff_par <- function(x) mean(x^2)
tic("Считаем последовательно")
lst1 <- num_row %>%
purrr::map_dbl(ff_seq) %>%
mean()
toc()
tic("Считаем параллельно, вариант 1")
lst2 <- num_row %>%
furrr::future_map_dbl(ff_seq) %>%
mean()
toc()
tic("Считаем параллельно, вариант 2")
lst2 <- num_row %>%
split(cut(seq_along(.), workers, labels = FALSE)) %>%
furrr::future_map_dbl(ff_par) %>%
mean()
toc()
Естественно, результат зависит от аппаратной платформы и ОС, на которой все запускается. На тестовом прогоне у меня такая раскладка:
Считаем последовательно: 7.23 sec elapsed
Считаем параллельно, вариант 1: 3.43 sec elapsed
Считаем параллельно, вариант 2: 0.64 sec elapsed
Windows и Linux достаточно сильно отличаются по методам параллелизации. Linux в продуктиве сильно предпочтительнее Windows.Паттерн 2. Локальная ручная параллелизацияСитация. В ряде случаев при работе скрипта необходимо выполнить незначительное число разовых неунифицируемых операций. Например, загрузка справочников и различных первичных данных. Есть возможное решение, функция%<-%.Генерация сэмплов
# создаем последовательность, матрица 20 атрибутов на 10^5 событий
nn <- 10^5
tic("Generating sample data.frame")
df <- 100 %>%
# stri_rand_strings(length = 10, pattern = "[a-z]") %>%
sample(10^4:10^5, .) %>%
sample(20 * nn, replace = TRUE) %>%
matrix(byrow = TRUE, ncol = 20) %>%
as_tibble(.name_repair = "universal") %>%
mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>%
# сгенерируем версию объекта
mutate(ver = sample(1:20, n(), replace = TRUE)) %>%
select(user_id, ver, everything())
toc()
# сохраним в файл для последующей демонстрации параллелизации
demo_fpath <- here::here("temp", "demo_data.xlsx")
openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE)
Два варианта загрузки файлов
plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential)
# https://github.com/HenrikBengtsson/future
# считаем, что воркеров у нас 2
tic("Объединяем последовательно обработанные файлы")
tic("Читаем файлы последовательно")
res_lst <- list()
for (j in 1:6) {
res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)}
}
toc()
seq_df <- bind_rows(res_lst)
toc()
tic("Объединяем параллельно обработанные файлы")
tic("Читаем файлы параллельно")
df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)}
toc()
par_df <- bind_rows(df1, df2, df3, df4, df5, df6)
toc()
all_equal(seq_df, par_df)
Некая разница наблюдается. Пример исключительно для демонстрации принципа. На тестовом прогоне у меня такая раскладка:
Объединяем последовательно обработанные файлы: 46.23 sec elapsed
Объединяем параллельно обработанные файлы: 37.82 sec elapsed
Паттерн 3. Параллелизация сложного процессингаСитуация. Много вычислительного данных, много кода, процессинг потенциально независим.Пример.
Сделаем общее задание. Будем считать число сочетаний $C_n^k$. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.Генерация списка заданий.Подготовка логгеров
# подготовка логгеров
flog_logname <- here::here("log", "job_futile.log")
lgr_logname <- here::here("log", "job_lgr.log")
initLogging <- function(log_file){
lgr <- get_logger_glue("logger")
lgr$set_propagate(FALSE)
lgr$set_threshold("all")
lgr$set_appenders(list(
console = AppenderConsole$new(
threshold = "info"
),
file = AppenderFile$new(
file = log_file,
threshold = "all"
)
))
lgr
}
invisible(flog.appender(appender.tee(flog_logname)))
invisible(flog.threshold(INFO))
lgr <- initLogging(lgr_logname)
Многопоточные расчеты
"Start batch processing" %T>%
flog.info() %T>%
lgr$info()
# инициализируем параллельную обработку
# https://github.com/HenrikBengtsson/doFuture
# https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html
registerDoFuture()
# future::plan(multiprocess)
future::plan(multisession, workers = parallel::detectCores())
# future::plan(sequential)
# plan(future.callr::callr)
tic("Batch processing")
start_time <- Sys.time()
foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"),
# .packages = 'futile.logger',
.verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% {
start <- Sys.time() - start_time
# инициализируем логгер в потоке
flog.appender(appender.tee(flog_logname))
lgr <- initLogging(lgr_logname)
res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE)
# https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/
message(" message from thread")
glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.",
"PID: {Sys.getpid()}",
"Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->",
.sep = " ") %T>%
flog.info() %T>%
lgr$info()
# вернем тайминги тоже
return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time))
} -> output_lst
flog.info("Foreach finished")
checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1)
output_tbl <- dplyr::bind_rows(output_lst)
# rm(output_lst)
# терминируем параллельную обработку --------------
future::plan(sequential)
gc(reset = TRUE, full = TRUE)
flog.info(capture.output(toc()))
Для иллюстрации процесса нарисуем график запуска (точка) и завершения (крестик) задач на вычислителях. Хорошо видны первичные затраты на старт потоков windows.
Код построения графика.
# посмотрим графически на порядок запуска вычислителей
output_tbl %>%
mutate_at("pid", as.factor) %>%
mutate_at(vars(start, finish), as.numeric) %>%
ggplot(aes(start, pid, colour = pid)) +
geom_point(size = 3, alpha = .7) +
geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") +
geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) +
ggthemes::scale_fill_tableau("Tableau 10") +
theme_ipsum_rc() +
xlim(c(0, 5))
ЗаключениеПри параллелизации задач, для достижения максимальной эффективности вычислений следует учитывать ряд важных моментов, вытекающих из принципов работы компьютеров, ОС и теоретических пределов. Если не погружаться глубоко в детали, резюмируем в виде «проверочных пунктов»:
- Инициализация вычислителей (worker) является достаточно дорогостоящей. Требуется породить новое окружение (поток, кластер, …), его инициализировать. Для коротких вычислений (секунды) затраты на инициализацию могут оказаться существенно выше однопоточного вычисления.
- При выделении потоков на одной машине, рекомендуется отдавать под вычислители core - 1, или чуть меньше. Один поток выполняет роль мастера, раздающего задания и выполняющего reduce ответов, получаемых от вычислителей. Ну и самой операционке тоже могут быть нужны ресурсы.
- Дескрипторы файлов и коннектов к БД не переходят границы потоков.
- Накладные расходы на перегон больших объемов данных из мастер потока в вычислитель и обратно могут оказаться по времени существенно выше, чем время вычисления. Оптимально, если мастер поток дает метаинформацию по заданию, а вычислитель уже сам загружает эти данные (из БД, из файлов, из API и т.д.). Ну и результат наверх должен уходить максимально агрегированный.
- Состав заданий надо максимально готовить в мастер потоке, а вниз спускать грубую вычислительную подзадачу. Повышается прозрачность и воспроизводимость.
- Для ряда задач, связанных с длинными синхронными запросами внешних системы (типичные представители -- REST API/Web scrapping), можно создавать вычислителей много больше чем доступных ядер. Они все равно висят большую часть времени в режиме ожидания. Можно запускать в виде отдельных процессов ОС с помощью настройки соответствующего бэкенда registerDoFuture(); plan(future.callr::callr). Это оставшаяся 1/2 рецепта.
Предыдущая публикация -- «Нюансы эксплуатации R решений в enterprise окружении?».
===========
Источник:
habr.com
===========
Похожие новости:
- [Сетевые технологии, Беспроводные технологии, Разработка систем связи, Научно-популярное, Космонавтика] Как работает спутниковая сеть StarLink (собственный анализ)
- [.NET, C#] Транслируй меня полностью
- [Системное администрирование, Программирование, IT-инфраструктура, DevOps] CI/CD — обещания и реальность (перевод)
- [Социальные сети и сообщества, Финансы в IT, IT-компании] 30 самых дорогих компаний Рунета на начало 2021 года по версии Forbes
- Выпуск Tor Browser 10.0.12 и дистрибутив Tails 4.16
- [Ненормальное программирование, Разработка под Arduino, DIY или Сделай сам, Электроника для начинающих] Измеритель расстояния на HC-SR04 без микроконтроллера
- [Информационная безопасность, Системное администрирование, Сетевые технологии] Как устроена фильтрация спама в Zimbra
- [Платежные системы, Разработка под e-commerce, Управление e-commerce, Управление продажами] Особенности национальной интеграции с платёжными системами
- Уязвимости во FreeBSD, позволяющие обойти ограничения jail-окружений
- [Высокая производительность, SQL, Проектирование и рефакторинг, Microsoft SQL Server] Напильник и щепотка фантазии… или как слепить Enterprise из SQL Server Express Edition
Теги для поиска: #_data_mining, #_big_data, #_r, #_upravlenie_razrabotkoj (Управление разработкой), #_data_science, #_enterprise, #_big_data, #_data_mining, #_big_data, #_r, #_upravlenie_razrabotkoj (
Управление разработкой
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 23-Ноя 01:50
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Публикация очень краткая. Многие думают, что параллельные вычисления в R -- это очень сложно и неприменимо к их текущим задачам. И да и нет. Если сознательно не вдаваться в теорию, железо и всякие подробности, то можно нарисовать «3 и 1/2» почти универсальных рецепта. Приведенные примеры сознательно похожи на продуктивные задачи, а не выхолощенные пара строчек синтетики.Является продолжением серии предыдущих публикаций.Используемые пакетыЗагрузка пакетов library(tidyverse)
library(magrittr) library(stringi) library(glue) library(dqrng) library(iterators) library(future) library(foreach) library(doFuture) library(tictoc) library(futile.logger) library(lgr) # будем использовать его рутовый логгер `lgr` library(hrbrthemes) registerDoFuture()
# future::plan(multiprocess) workers <- parallel::detectCores() - 1 future::plan(multisession, workers = workers) num_row <- 1:10^6 ff_seq <- function(x) x^2 ff_par <- function(x) mean(x^2) tic("Считаем последовательно") lst1 <- num_row %>% purrr::map_dbl(ff_seq) %>% mean() toc() tic("Считаем параллельно, вариант 1") lst2 <- num_row %>% furrr::future_map_dbl(ff_seq) %>% mean() toc() tic("Считаем параллельно, вариант 2") lst2 <- num_row %>% split(cut(seq_along(.), workers, labels = FALSE)) %>% furrr::future_map_dbl(ff_par) %>% mean() toc() Считаем последовательно: 7.23 sec elapsed
Считаем параллельно, вариант 1: 3.43 sec elapsed Считаем параллельно, вариант 2: 0.64 sec elapsed # создаем последовательность, матрица 20 атрибутов на 10^5 событий
nn <- 10^5 tic("Generating sample data.frame") df <- 100 %>% # stri_rand_strings(length = 10, pattern = "[a-z]") %>% sample(10^4:10^5, .) %>% sample(20 * nn, replace = TRUE) %>% matrix(byrow = TRUE, ncol = 20) %>% as_tibble(.name_repair = "universal") %>% mutate(user_id = as.character(sample(1:as.integer(nn/10), n(), replace = TRUE))) %>% # сгенерируем версию объекта mutate(ver = sample(1:20, n(), replace = TRUE)) %>% select(user_id, ver, everything()) toc() # сохраним в файл для последующей демонстрации параллелизации demo_fpath <- here::here("temp", "demo_data.xlsx") openxlsx::write.xlsx(df, demo_fpath, asTable = TRUE) plan(multisession, workers = parallel::detectCores() - 2)
# plan(sequential) # https://github.com/HenrikBengtsson/future # считаем, что воркеров у нас 2 tic("Объединяем последовательно обработанные файлы") tic("Читаем файлы последовательно") res_lst <- list() for (j in 1:6) { res_lst[[j]] <- { readxl::read_excel(demo_fpath) %>% head(5)} } toc() seq_df <- bind_rows(res_lst) toc() tic("Объединяем параллельно обработанные файлы") tic("Читаем файлы параллельно") df1 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df2 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df3 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df4 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df5 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} df6 %<-% { readxl::read_excel(demo_fpath) %>% head(5)} toc() par_df <- bind_rows(df1, df2, df3, df4, df5, df6) toc() all_equal(seq_df, par_df) Объединяем последовательно обработанные файлы: 46.23 sec elapsed
Объединяем параллельно обработанные файлы: 37.82 sec elapsed Сделаем общее задание. Будем считать число сочетаний $C_n^k$. Дополнительно добавим несколько вариантов логирования при параллельных вычислениях.Генерация списка заданий.Подготовка логгеров # подготовка логгеров
flog_logname <- here::here("log", "job_futile.log") lgr_logname <- here::here("log", "job_lgr.log") initLogging <- function(log_file){ lgr <- get_logger_glue("logger") lgr$set_propagate(FALSE) lgr$set_threshold("all") lgr$set_appenders(list( console = AppenderConsole$new( threshold = "info" ), file = AppenderFile$new( file = log_file, threshold = "all" ) )) lgr } invisible(flog.appender(appender.tee(flog_logname))) invisible(flog.threshold(INFO)) lgr <- initLogging(lgr_logname) "Start batch processing" %T>%
flog.info() %T>% lgr$info() # инициализируем параллельную обработку # https://github.com/HenrikBengtsson/doFuture # https://cran.r-project.org/web/packages/future/vignettes/future-1-overview.html registerDoFuture() # future::plan(multiprocess) future::plan(multisession, workers = parallel::detectCores()) # future::plan(sequential) # plan(future.callr::callr) tic("Batch processing") start_time <- Sys.time() foreach(it = iter(jobs_tbl, by = "row"), .export = c("start_time"), # .packages = 'futile.logger', .verbose = FALSE, .inorder = FALSE, .errorhandling = "remove") %dopar% { start <- Sys.time() - start_time # инициализируем логгер в потоке flog.appender(appender.tee(flog_logname)) lgr <- initLogging(lgr_logname) res <- arrangements::npermutations(k = it$k, n = it$n, bigz = TRUE) # https://www.jottr.org/2020/11/06/future-1.20.1-the-future-just-got-a-bit-brighter/ message(" message from thread") glue("Step {it$idx_str} finished. RAM used {capture.output(pryr::mem_used())}.", "PID: {Sys.getpid()}", "Elapsed {round(difftime(Sys.time(), start_time, units = 'mins'), digits = 2)} min(s) ----------->", .sep = " ") %T>% flog.info() %T>% lgr$info() # вернем тайминги тоже return(list(pid = Sys.getpid(), start = start, finish = Sys.time() - start_time)) } -> output_lst flog.info("Foreach finished") checkmate::assertList(output_lst, any.missing = FALSE, null.ok = FALSE, min.len = 1) output_tbl <- dplyr::bind_rows(output_lst) # rm(output_lst) # терминируем параллельную обработку -------------- future::plan(sequential) gc(reset = TRUE, full = TRUE) flog.info(capture.output(toc())) Код построения графика. # посмотрим графически на порядок запуска вычислителей
output_tbl %>% mutate_at("pid", as.factor) %>% mutate_at(vars(start, finish), as.numeric) %>% ggplot(aes(start, pid, colour = pid)) + geom_point(size = 3, alpha = .7) + geom_point(aes(x=finish), shape = 4, size = 3, colour = "black") + geom_vline(aes(xintercept = start, colour = pid), lty = "dashed", alpha = 0.7) + ggthemes::scale_fill_tableau("Tableau 10") + theme_ipsum_rc() + xlim(c(0, 5))
=========== Источник: habr.com =========== Похожие новости:
Управление разработкой ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 23-Ноя 01:50
Часовой пояс: UTC + 5