[Программирование, Scala] Основы Cat Concurrency с Ref и Deferred (перевод)
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Параллельный доступ и ссылочная прозрачность
Для будущих учащихся на курсе «Scala-разработчик» приготовили перевод материала.
Приглашаем также на вебинар по теме «Эффекты в Scala». На занятии рассмотрим понятие эффекта и сложности, которые могут возникать при их наличии. Также введем понятие функционального эффекта, рассмотрим его свойства и реализуем свой небольшой функциональный эффект. Присоединяйтесь.
*Concurrency — конкурентность, допускающая одновременное выполнение нескольких вычислительных процессов.Ref и Deferred являются основными строительными блоками в FP, используемыми параллельно, в манере concurrent. Особенно при использовании c tagless final (неразмеченной конечной) абстракцией, эти два блока, при построении бизнес-логики, могут дать нам и то, и другое: параллельный доступ (concurrent access) и ссылочную прозрачность (referential transparency), и мы можем использовать их для построения более продвинутых структур, таких как counters (счетчики) и state machines (конечные автоматы).Перед тем, как мы углубимся в Ref и Deferred, нам полезно узнать, что concurrency в Cats строится на Java AtomicReference, и здесь мы и начнем наше путешествие.Atomic ReferenceAtomicReference — это один из элементов пакета java.util.concurrent.atomic. В Oracle docs мы можем прочитать, что java.util.concurrent.atomic — это:
Небольшой инструментарий классов, поддерживающих потокобезопасное программирование «без блоков» с одиночными переменными. По сути, классы в данном пакете расширяют понятие volatile значений, полей и элементов массива до тех, которые также обеспечивают условную операцию atomic обновления…Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong, и AtomicReference обеспечивают доступ и обновление от одиночных переменных к соответствующему типу (функционального блока).
AtomicReference с нами начиная с Java 1.5 и используется для получения лучшей производительности, чем синхронизации (хотя это не всегда так).Когда вам приходится совместно использовать некоторые данные между нитями (threads), вы должны защитить доступ к этой части данных. Самым простым примером будет увеличение некоторого количества int: i = i + 1. Наш пример состоит из фактически 3 операций, сначала мы читаем значение i , затем добавляем 1 к этому значению, а в конце снова присваиваем вычисленное значение i . В отношении многопоточных приложений, мы можем столкнуться с ситуацией, когда каждый thread будет выполнять эти 3 шага между шагами другого thread, а конечное значение i предсказать не удастся.Обычно в вашей голове появляется слово synchronised или механизм класса lock, но с atomic.* вам больше не нужно беспокоиться о явной синхронизации, и вы можете перейти на предоставленные atomic (атомарные) типы утилит, где проверка выполнения операции в один шаг включается автоматически.Давайте, возьмем для примера AtomicInteger.incrementAndGet:
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
С помощью операции compareAndSet мы либо обновляем наши данные, либо терпим неудачу, но никогда не заставляем thread ждать. Таким образом, если операция compareAndSet в incrementAndGet не удаётся, мы просто пытаемся повторить всю операцию заново, извлекая текущее значение наших данных с помощью функции get() в начале. С другой стороны, при использовании синхронизированных механизмов нет ограничений на количество операторов (statement), которые вы хотите «выполнить» во время блокировки, но этот блок никогда не выйдет из строя и может заставить вызывающий thread ждать, предоставляя возможность заблокировать или снизить производительность.Теперь, зная определенные основы, давайте перейдем к нашей первой мега-звезде concurrency.RefRef в Cats очень похож на упомянутую выше atomic (атомарную) ссылку Java. Основные отличия заключаются в том, что Ref используется с tagless final абстракцией F . Он всегда содержит значение, а значение, содержащееся в Ref — типа A, всегда является неизменным (immutable).
abstract class Ref[F[_], A] {
def get: F[A]
def set(a: A): F[Unit]
def modify[B](f: A => (A, B)): F[B]
// ... and more
}
Ref[F[_], A] — это функциональная изменяемая (mutable) ссылка:
- Concurrent ( конкурентная)
- Lock free ( “без блоков”)
- Всегда содержит значение
Она создается путем предоставления начального значения, и каждая операция осуществляется в
F, например, cats.effect.IO.Если мы внимательно посмотрим на сопутствующий объект для Cats Ref, мы увидим, что наша F должна соответствовать некому требованию, а именно быть Sync.
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
Вышеприведенный метод является лишь примером многих операций, доступных на нашем Ref; он используется для построения Ref с исходным значением. Sync дает нам возможность приостанавливать любые побочные эффекты с помощью метода
delay для каждой операции на Ref.Ref — довольно простая конструкция, мы можем сосредоточиться в основном на ее get, set и of чтобы понять, как она работает.Метод get and set Допустим, у нас есть объект (для этого блога мы назовем его Shared), который нужно обновить несколькими threads, и мы используем для этого наши методы get и set , создавая утилитный метод, который поможет нам в дальнейшем:
def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
for {
sh <- trace.get()
_ <- trace.set(Shared(sh, msg))
} yield ()
}
Наш Shared объект может быть построен путем использования его предыдущего состояния и нового значения для создания нового экземпляра — Shared, который может быть на самом деле всем, что мы хотим — простым списком, картой или чем угодно, к чему мы хотим получить одновременный безопасный доступ. Я только что создал Shared(prev: Shared, msg: String) для данной статьи.В нашем примере выше F был заменён конкретным IO из Cats Effect, но имейте в виду, что Ref является полиморфным в F и может быть использован с другими библиотеками.С помощью monadic (монадический) IO мы применяем функцию flatMap на каждом шаге и устанавливаем значение, сохраненное в нашем Ref на желаемое значение — или... подождите, может быть, мы этого не делаем.При таком подходе, когда modifyShared будет вызываться одновременно, и мы можем потерять обновления! Это происходит потому, что мы можем столкнуться с ситуацией, когда, например, двое threads могут прочитать значение с помощью get и каждый из них будет выполнять set одновременно. Методы get и set не вызываются атомарно (atomically) вместе.Atomic (атомарный) updateКонечно, мы можем улучшить приведенный выше пример и использовать другие доступные методы из Ref. Для совместной реализации get и set мы можем использовать update.
def update(f: A => A): F[Unit]
Это решит нашу проблему с обновлением значения, однако update имеет свои недостатки. Если мы захотим обратиться к переменной сразу после обновления, аналогично тому, как мы использовали get и set , мы можем в итоге получить устаревшие данные, допустим, наш Ref будет содержать ссылку на Int:
for {
_ <- someRef.update(_ + 1)
curr <- someRef.get
_ <- IO { println(s"current value is $curr")}
} yield ()
Нас спасет modify Мы можем немного улучшить вышеупомянутую ситуацию, используя modify , которая будет делать то же самое, что и update , но тем не менее, modify вернет нам обновленное значение для дальнейшего использования.
def modify[B](f: A => (A, B)): F[B] = {
@tailrec
def spin: B = {
val c = ar.get
val (u, b) = f(c)
if (!ar.compareAndSet(c, u)) spin
else b
}
F.delay(spin)
}
Как видите, это практически та же имплементация, что и в примере с AtomicInteger.incrementAndGet, который я показывал в начале, но только в Scala. Нам четко видно, что для выполнения своей работы Ref также работает на основе AtomicReference .Ref ограниченияВы, вероятно, уже заметили, что в случае неудачи при обновлении значения функция, переданная update/ modify, должна быть запущена недетерминированно (nondeterministically) и, возможно, должна быть запущена несколько раз. Хорошая новость заключается в том, что это решение в целом оказывается намного быстрее, чем стандартный механизм блокировки и синхронизации, и гораздо безопаснее, так как это решение не может быть заблокировано.Как только мы узнаем, как работает простой Ref, мы можем перейти к другому классу Cats Concurrent: Deferred (Отложенный вызов).DeferredВ отличие от Ref, Deferred:
- создается «пустым» (отложенный результат выполнения)
- может быть выполнен один раз
- и после установки его нельзя изменить или снова сделать «пустым».
Эти свойства делают Deferred простым и в то же время довольно интересным.
abstract class Deferred[F[_], A] {
def get: F[A]
def complete(a: A): F[Unit]
}
Deferred используется для явной функциональной синхронизации. Когда мы вызываем get в «пустой» Deferred мы устанавливаем блокировку до того момента, как значение станет вновь доступно. В соответствии с документацией из самого класса:
- Блокировка указана только семантическая, никакие реальные threads (нити) не блокируются имплементацией
Тот же вызов get «непустого» Deferred немедленно вернет сохраненное значение.Другой метод — complete — заполнит значение, если экземпляр пуст и при вызове «непустого» Deferred приведет к сбою (неудачная попытка IO).Здесь важно отметить, что Deferred требует, чтобы F было Concurrent, что означает, что его можно отменить.Хорошим примером использования Deferred является ситуация, когда одна часть вашего приложения должна ждать другую. Пример ниже взят из великолепного выступления Фабио Лабеллы на выставке Scala Italy 2019 — Composable Concurrency with Ref + Deferred available at Vimeo
def consumer(done: Deferred[IO, Unit]) = for {
c <- Consumer.setup
_ <- done.complete(())
msg <- c.read
_ <- IO(println(s"Received $msg"))
} yield ()
def producer(done: Deferred[IO, Unit]) = for {
p <- Producer.setup()
_ <- done.get
msg = "Msg A"
_ <- p.write(msg)
_ <- IO(println(s"Sent $msg"))
} yield ()
def prog = for {
d <- Deferred[IO, Unit]
_ <- consumer(d).start
_ <- producer(d).start
} yield ()
В приведенном выше примере у нас есть producer (производитель) и consumer (потребитель), и мы хотим, чтобы producer ждал, пока consumer setup закончится, прежде чем писать сообщения, в противном случае все, что бы мы ни написали в producer, будет потеряно. Для преодоления этой проблемы мы можем использовать общий экземпляр Deferred и блокировать get до тех пор, пока не будет заполнен экземпляр done Deferred со стороны consumer (значение в данном случае простая Unit () ).Конечно, вышеуказанное решение не обошлось без проблем, когда consumer setup никогда не прекращался, мы застревали в ожидании, а producer не мог отправлять сообщения. Чтобы преодолеть это, мы можем использовать таймаут с get , а также использовать Either[Throwable, Unit] или какую-либо другую конструкцию вместо простой Unit внутри нашего объекта Deferred.Deferred довольно прост, но в сочетании с Ref он может быть использован для построения более сложных структур данных, таких как semaphores (семафоры).Для получения более подробной информации я рекомендую вам ознакомиться с самой документацией о Cats, где вы можете узнать больше о Cats concurrency и структуре данных, которые она предоставляет.
===========
Источник:
habr.com
===========
===========
Автор оригинала: Krzysztof Grajek
===========Похожие новости:
- [Высокая производительность, Java, Микросервисы] Улучшаем производительность Java-микросервиса парой простых приемов (перевод)
- [DevOps, Kubernetes] Job’ы и Cronjob’ы Kubernetes (перевод)
- [Программирование, Учебный процесс в IT, Карьера в IT-индустрии] Опрос: «А вы бы отдали своего ребёнка в IT?»
- [Программирование, Разработка под iOS, Разработка под Android, Тестирование мобильных приложений] Автоматизация тестирования мобильных приложений. Часть 1: проверки, модули и базовые действия
- [Программирование, Разработка мобильных приложений, Dart, Flutter] Flutter 2: что нового (перевод)
- [Программирование, Машинное обучение, Визуальное программирование] «ДЕЛАЙ КАК Я!» — ИЗУЧАЕМ ИМПЕРСОНАТОР
- [Программирование, Управление разработкой, Управление персоналом, DevOps] Стратегии выплаты технического долга (перевод)
- [Системное администрирование, Программирование, IT-инфраструктура, DevOps] Самоподписные сертификаты кровавого энтерпрайза против вашего лампового CI/CD
- [Отладка, Программирование микроконтроллеров] Полноценная GDB отладка через USB на плате BluePill (STM32F103С8T6)
- [Программирование, Java] Поддержка Null в Protobuf (перевод)
Теги для поиска: #_programmirovanie (Программирование), #_scala, #_scala, #_funktsionalnoe_testirovanie (функциональное тестирование), #_software_engineering, #_concurrency, #_blog_kompanii_otus (
Блог компании OTUS
), #_programmirovanie (
Программирование
), #_scala
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 18:37
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Параллельный доступ и ссылочная прозрачность Для будущих учащихся на курсе «Scala-разработчик» приготовили перевод материала.
Приглашаем также на вебинар по теме «Эффекты в Scala». На занятии рассмотрим понятие эффекта и сложности, которые могут возникать при их наличии. Также введем понятие функционального эффекта, рассмотрим его свойства и реализуем свой небольшой функциональный эффект. Присоединяйтесь. *Concurrency — конкурентность, допускающая одновременное выполнение нескольких вычислительных процессов.Ref и Deferred являются основными строительными блоками в FP, используемыми параллельно, в манере concurrent. Особенно при использовании c tagless final (неразмеченной конечной) абстракцией, эти два блока, при построении бизнес-логики, могут дать нам и то, и другое: параллельный доступ (concurrent access) и ссылочную прозрачность (referential transparency), и мы можем использовать их для построения более продвинутых структур, таких как counters (счетчики) и state machines (конечные автоматы).Перед тем, как мы углубимся в Ref и Deferred, нам полезно узнать, что concurrency в Cats строится на Java AtomicReference, и здесь мы и начнем наше путешествие.Atomic ReferenceAtomicReference — это один из элементов пакета java.util.concurrent.atomic. В Oracle docs мы можем прочитать, что java.util.concurrent.atomic — это: Небольшой инструментарий классов, поддерживающих потокобезопасное программирование «без блоков» с одиночными переменными. По сути, классы в данном пакете расширяют понятие volatile значений, полей и элементов массива до тех, которые также обеспечивают условную операцию atomic обновления…Экземпляры классов AtomicBoolean, AtomicInteger, AtomicLong, и AtomicReference обеспечивают доступ и обновление от одиночных переменных к соответствующему типу (функционального блока).
/**
* Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } } abstract class Ref[F[_], A] {
def get: F[A] def set(a: A): F[Unit] def modify[B](f: A => (A, B)): F[B] // ... and more }
F, например, cats.effect.IO.Если мы внимательно посмотрим на сопутствующий объект для Cats Ref, мы увидим, что наша F должна соответствовать некому требованию, а именно быть Sync. def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]] = F.delay(unsafe(a))
delay для каждой операции на Ref.Ref — довольно простая конструкция, мы можем сосредоточиться в основном на ее get, set и of чтобы понять, как она работает.Метод get and set Допустим, у нас есть объект (для этого блога мы назовем его Shared), который нужно обновить несколькими threads, и мы используем для этого наши методы get и set , создавая утилитный метод, который поможет нам в дальнейшем: def modifyShared(trace: Ref[IO, Shared], msg: String): IO[Unit] = {
for { sh <- trace.get() _ <- trace.set(Shared(sh, msg)) } yield () } def update(f: A => A): F[Unit]
for {
_ <- someRef.update(_ + 1) curr <- someRef.get _ <- IO { println(s"current value is $curr")} } yield () def modify[B](f: A => (A, B)): F[B] = {
@tailrec def spin: B = { val c = ar.get val (u, b) = f(c) if (!ar.compareAndSet(c, u)) spin else b } F.delay(spin) }
abstract class Deferred[F[_], A] {
def get: F[A] def complete(a: A): F[Unit] }
def consumer(done: Deferred[IO, Unit]) = for {
c <- Consumer.setup _ <- done.complete(()) msg <- c.read _ <- IO(println(s"Received $msg")) } yield () def producer(done: Deferred[IO, Unit]) = for { p <- Producer.setup() _ <- done.get msg = "Msg A" _ <- p.write(msg) _ <- IO(println(s"Sent $msg")) } yield () def prog = for { d <- Deferred[IO, Unit] _ <- consumer(d).start _ <- producer(d).start } yield () =========== Источник: habr.com =========== =========== Автор оригинала: Krzysztof Grajek ===========Похожие новости:
Блог компании OTUS ), #_programmirovanie ( Программирование ), #_scala |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 18:37
Часовой пояс: UTC + 5