Досега
- Програми с мощността на ламбда смятането/машината
на Тюринг
- Нямат връзка с околния свят
- вход => предвидима трансформация =>
изход
- Трансформиращи програми
- Добре изучени, “лесни”
IO
import Console._
val program = for
_ <- putStrLn("What is your name?")
name <- getStrLn
_ <- putStrLn("Hello, " + name + ", welcome!")
yield ()
program.unsafeRun()
- Връзка с външния свят
- Но синхронна – програмата не прави нищо друго
докато чака
- Интерактивни програми
Реалният свят
- Светът навън е силно паралелен и конкурентен
- Нещо повече, участниците в него си взаимодействат и
реагират едни на други
- Развива се във времето
- Реактивни програми
- “Трудни”, но и най-интересни :)
- Как да ги моделираме?
Конкурентност и паралелизъм
-
parallel
-
from παρά + ἄλληλος, along each other
-
concurrent
-
present active participle of concurrō (“happen at the same time with”),
from con- (“with”) + currō (“run”)
-
concurrent computing
-
a form of computing in which several computations are executed during
overlapping time periods—concurrently—instead of sequentially
Конкурентност и паралелизъм
- конкурентност – когато няколко задачи имат
възможност да прогресират в рамките на един и същи период от време, но
не задължително едновременно
- пример: multitasking позволява различни нишки,
споделящи си едно процесорно ядро, да прогресират заедно, редувайки се
коя ще използва ядрото
- паралелизъм – когато конкурентни задачи биват
изпълнявани едновременно (по едно и също време)
- пример: наличието на няколко ядра би ни позволило
няколко нишки да прогресират едновременно
Конкурентност и паралелизъм
![]()
Конкурентност
В изчислителен контекст:
- конкурентността се отнася към структурата
на програмата,
- паралелизмът – към хардуера и как тя ще
бъде изпълнявана.
Конкурентните програми са композитност от unit-и от изчисления, които,
веднъж дефинирани, могат да бъдат изпълнени независимо едно от друго.
Дистрибутирани системи
Независими компоненти със собствени изчислителни ресурси и памет,
които комуникират помежду си чрез съобщения и изграждат цялостна
система
(обичайно) работят конкурентно и паралелно един спрямо друг
“A distributed system is one in which the failure of a computer you
didn’t even know existed can render your own computer unusable.”
Реактивност
Свойството на програмите/компонентите да реагират
на света около
тях (с което да са част от него)
Конкурентни модели
- От ниско ниво
- Нишки
- I/O и TCP/IP конкурентност
- Абстракции
- Callbacks & event loop
- Future/Promise & IO
- Актьорски модел
- Communicating Sequential Processes
- Stream/dataflow конкурентност
- …нещо друго?
Какво би направило един модел подходящ?
- Ease to reason about it
- Expressive
- Safe
- Doesn’t hide possibility for errors (e.g. in a distributed setting)
- Composable
- Functional programming to the rescue
Нишки
![]()
Кой е ползвал НИШКИ?
Time slicing
Нишките се управляват от OS scheduler:
![]()
Lifecycle
![]()
Как си комуникират нишките?
![]()
…чрез споделено състояние!
За да бъде смислен, всеки конкурентен примитив е
нужно да има поне една интеракция с околния свят или с други
примитиви
Границите с дистрибутираните системи се
размиват
Нишки – чрез споделена памет и средства на
процесора и ОС
Модел на синхронизация:
![]()
Проблеми със споделеното състояние
- Видимост
- ако нишка A промени стойност, тя ще бъде ли видяна
от нишка B?
- Race condition
- когато стигнем до невалидно или неконсистентно
състояние
- най-често ако една нишка презапише резултатът от
действията на друга
Видимост между нишки –
happens-before релация
- Една нишка A вижда промените от друга B само ако те
са се случили преди определено действие на A
- Релацията се случва преди се
определя много строго по следната дефиниция:
- Запис във
volatile
променлива
се случва преди последващо нейно прочитане (от същата
или друга нишка)
- За всеки два последователни statement-а в една
нишка, първият се случва преди вторият
- Релацията е транзитивно затворена
- Образува се частична наредба между нишките
Видимост между нишки –
happens-before релация
- Всички други средства за конкурентност на Java също
използват volatile семантиката
- Следователно дефиницията от предния слайд важи и за
тях
Видимост между нишки
- Допълнително JVM ни гарантира:
- всяка референция към immutable обект сочи към обект
с напълно валидно състояние
- неизменимостта премахва огромен клас от възможни
грешки
Проблеми на нишките
- Стартират бавно
- Тежки
- всяка има стек, регистри
- превключването между нишки е бавно и минава през
ядрото на ОС
- Липсва реактивност
- Комплексни
- комуникират чрез споделено състояние
- изискват синхронизация
- нишките са няколко императивни програми,
изпълняващи се конкурентно
- всяка със собствена времева линия, но
недетерминистично преплетени на моменти
- Не се композират
- The
Problem with Threads
Бавно стартиране – Thread pools
![]()
- вместо да стартираме и спираме нишки, ще опитаме да
ги преизползваме
- ще разделим работата на задачи – можем да
използваме
Runnable
интерфейса за тях
- задачите се изпращат към pool от нишки, готови да
ги изпълнят, като ако няма свободна – изчакват
- всяка задача може да добави нови задачи по време на
своето изпълнение
![]()
Ще ги разгледаме по-подробно по-късно
Реактивност – Callbacks
def doSomething(onComplete: Result => Unit): Unit
- Задействат се при завършване на работа или при
определено събитие (естествени за event
loop)
- Асинхронни и реактивни
- Ще ги изпълним върху pool от нишки (брой = ~брой
ядра)
def doSomethingDangerous(onComplete: Try[Result] => Unit): Unit
Callback hell
![]()
Callbacks – негативи
- Императивни, работят с mutable state
- Некомпозитни. Callback hell
- Ако се изпълняват в различни нишки, изискват
синхронизация
- Ръчно спряване с грешки
Конкурентен IO
Какво бяхме постигнали с IO
?
- Страничен ефект => функционален ефект
- Защо това ни харесва?
- Функционално композиране на ефекти
- Странични ефекти само при изпълнение
Как би изглеждал конкурентен IO
?
Защо точно тези операции?
При императивното всяка нишка описва собствена времева линия,
преплитаща се с другите
Функционално програмиране
![]()
Изрази
val a = 10
val b = 40
val c = a * 10
val d = (a, b)
val e = a + b
def f(x: Int): String = ???
def g(x: String): Boolean = ???
def r = g(f(a))
Ефектни изрази
val a = IO(10)
val b = IO(40)
val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)
def f(n: Int): IO[String] = ???
def g(str: String): IO[Boolean] = ???
def r = a.flatMap(f).flatMap(g)
Ефектни изрази
val a = IO(10)
val b = IO(40)
val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)
def f(n: Int): IO[String] = ???
def g(n: Int, str: String): IO[Boolean] = ???
def r = for
n <- a
str <- f(n)
bool <- g(n, str)
yield bool
Immutability
IO е безопасен, само ако стойностите в него са immutable!
Ако не са, то тяхното състояние може да е неизвестно
Какви thread pools да ползваме? CPU-bound vs IO-bound
CPU-bounded задачи
- Препоръка: thread pool с размер броя ядра
- Повече няма да подобрят производителността
- Приложния код и леки трансформации също могат да вървят в този
thread pool
- Често имплементират work-stealing алгоритми (напр. Fork Join
Pool)
![]()
IO-bounded задачи
- При синхронен вход/изход всяка операции блокира нишката
- Тя не прави нищо през това време
- Ако искаме да постигнем добро ниво на конкурентност са ни нужни
повечко нишки
- CachedThreadPool позволява добавяне на нишки при нужда
![]()
Event-dispatcher IO
- Една нишка се справя с множество IO операции
- Имплементира се event loop
- Още се нарича reactor pattern
- Използва се една или малко на брой нишки
- Препоръчва се в съвременни приложения с много клиенти
![]()
Асинхронен вход/изход чрез IO
Какво направихме предния път?
IO, със следната функционалност:
- Delayed
- поддържа пускане на изчисление асинхронно (в CPU-bounded thread
pool)
- Блокиращ Delayed
- позволява да изпълним синхронен блокиращ вход/изход в отделен thread
pool (IO-bounded)
- Concurrent
- може да изпълни две IO-та конкурентно едно от друго и да изчака
резултата им
- Async
- може да адаптира callback-базиран API, включително такива за
неблокиращ вход/изход
IO
със Cats Effect
![]()
Cats Effect IO
разлики
Нашето IO |
Cats Effect IO |
IO.apply, IO.blocking |
IO.apply, IO.blocking |
IO.of, IO.error |
IO.pure, IO.raiseError |
IO.println, IO.readln |
IO.println, IO.readline |
IO.async |
IO.async_, IO.async |
map, flatMap, >>=, >> |
map, flatMap, >>=, >> |
flatMapError |
handleErrorWith |
zip, zipMap |
parProduct, – |
– |
parTupled, parMapN |
Някои от тези изискват import cats.syntax.all.*
Използване на Cats Effect IO
(демо)
Полезни функции
parSequence
recover
recoverWith
orElse
IO.race
Неопределен брой независими изчисления
def retrieveAge(userId: Id): IO[Int] = ???
def averageFriendsAge(user: User): IO[Option[Int]] = for
friendIds: List[Id] <- user.friends
friendsAges: List[IO[Int]] = friendIds.map(retrieveAge)
ages <- friendsAges.sequence: IO[List[Int]]
yield average(ages)
Recover
def doSomethingForUser(userId: Id): IO[HttpResponse] =
(for
user <- retrieveUser(userId)
result <- doService(user, input)
yield Ok(constructBody(result))).recover:
case UserNotFound(userId) => NotFound
case _: ActionNotPermitted => Unauthorized
Recover with друга, по-стабилна алтернатива
def dangerousService(input: ServiceInput): IO[Result] = ???
def safeService(input: ServiceInput): IO[Result] = ???
def calculate(input: ServiceInput): IO[Result] =
dangerousService(input).recoverWith:
case _: ServiceFailure => safeService(input)
Fallback
def dangerousService(input: ServiceInput): IO[Result] = ???
def safeService(input: ServiceInput): IO[Result] = ???
def calculate(input: ServiceInput): IO[Result] =
dangerousService(input).orElse(safeService(input))
Състезание
def algorithm1(input: String): IO[Int] = ???
def algorithm2(input: String): IO[Int] = ???
def calculation(input: Stirng): IO[Int] =
IO.race(algorithm1(input), algorithm2(input))
Future и Promise
- Част от стандартната библиотека на Scala
- Стартират изпълнение веднага щом бъдат дефинирани
(а не при unsafeRun)
- това води до някои проблеми
- Подобни на CompletableFuture в Java и Promise в
JavaScript
Future и Promise API
Future |
Cats Effect IO |
Future.apply |
IO.apply, IO.blocking |
Future.successful, Future.failed |
IO.pure, IO.raiseError |
– |
IO.println, IO.readline |
Promise, promise.complete |
IO.async_, IO.async |
map, flatMap |
map, flatMap, >>=, >> |
transform, recoverWith, .. . |
handleErrorWith и други |
zip |
parProduct, – |
zip, zipWith |
parTupled, parMapN |
Future.sequence |
sequence |
fallback |
orElse |
Референтна прозрачност
def calc[T](expr: => T) = Future {
Thread.sleep(4000)
expr
}
val futureA = calc(42)
val futureB = calc(10)
val sum = for
a <- futureA
b <- futureB
yield a + b
println {
Await.result(sum, 5.seconds)
}
Референтна прозрачност
def calc[T](expr: => T) = Future {
Thread.sleep(4000)
expr
}
val sum = for
a <- calc(42)
b <- calc(10)
yield a + b
println {
Await.result(sum, 5.seconds)
}
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
Правилният начин да постигнем паралелилизъм:
val sum = for
(a, b) <- calc(42).zip(calc(10))
yield a + b
println {
Await.result(sum, 5.seconds)
}
> 52
Референтна прозрачност (пример 2)
def calc[T](expr: => T) = Future {
println("Hello")
Thread.sleep(4000)
expr
}
val futureCalc = calc(42)
val sum = for
(a, b) <- futureCalc zip futureCalc
yield a + b
println {
Await.result(sum, 5.seconds)
}
Референтна прозрачност (пример 2)
def calc[T](expr: => T) = Future {
println("Hello")
Thread.sleep(4000)
expr
}
val sum = for
(a, b) <- calc(42) zip calc(42)
yield a + b
println {
Await.result(sum, 5.seconds)
}
Докъде сме?
Следващия път ще си поговорим повече за абстракции