Досега
- Програми с мощността на ламбда смятането/машината на Тюринг
- Последователни изчисления, не се влияят от времето
- Нямат връзка с околния свят
- вход => предвидима трансформация => изход
- Трансформиращи програми
- Добре изучени, “лесни”
IO
import Console._
val program = for
_ <- putStrLn("What is your name?")
name <- getStrLn
_ <- putStrLn("Hello, " + name + ", welcome!")
yield ()
program.unsafeRun()
doSomethingElse()
- Връзка с външния свят
- Но синхронна – програмата не прави нищо друго докато чака
- Интерактивни програми
Реалният свят
- Светът навън е силно паралелен и конкурентен
- Нещо повече, участниците в него си взаимодействат
- Развива се във времето
- Как да моделираме такива програми?
Конкурентност и паралелизъм
-
parallel
-
from παρά + ἄλληλος, along each other
-
concurrent
-
present active participle of concurrō (“happen at the same time with”), from con- (“with”) + currō (“run”)
-
concurrent computing
-
a form of computing in which several computations are executed during overlapping time periods—concurrently—instead of sequentially
Конкурентност и паралелизъм
- конкурентност – когато няколко задачи имат възможност да прогресират в рамките на един и същи период от време, но не задължително едновременно
- пример: multitasking позволява различни нишки, споделящи си едно процесорно ядро, да прогресират заедно, редувайки се коя ще използва ядрото
- паралелизъм – когато конкурентни задачи биват изпълнявани едновременно (по едно и също време)
- пример: наличието на няколко ядра би ни позволило няколко нишки да прогресират едновременно
Конкурентност
В изчислителен контекст:
- конкурентността се отнася към структурата на програмата,
- паралелизмът – към хардуера и как тя ще бъде изпълнявана.
Конкурентните програми са композитност от unit-и от изчисления, които, веднъж дефинирани, могат да бъдат изпълнени независимо едно от друго.
Дистрибутираност
Независими компоненти със собствени изчислителни ресурси и памет, които комуникират помежду си чрез съобщения и изграждат цялостна система
(обичайно) работят конкурентно и паралелно един спрямо друг
“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.”
Реактивност
Свойството на програмите/компонентите да реагират
на света около тях (с което да са част от него)
Конкурентни модели
- От ниско ниво
- Нишки
- I/O и TCP/IP конкурентност
- Абстракции
- Callbacks & event loop
- Future/Promise & IO
- Актьорски модел
- Communicating Sequential Processes
- Stream/dataflow конкурентност
- …нещо друго?
Какво би направило един модел подходящ?
- Ease to reason about it
- Expressive
- Safe
- Doesn’t hide possibility for errors (e.g. in a distributed setting)
- Composable
- Functional programming to the rescue
Как си комуникират нишките?
![]()
…чрез споделено състояние!
За да бъде смислен, всеки конкурентен примитив е нужно да има поне една интеракция с околния свят или с други примитиви
Границите с дистрибутираните системи се размиват
Нишки – чрез споделена памет и средства на процесора и ОС
Модел на синхронизация:
![]()
Видимост между нишки –
happens-before релация
- Една нишка A вижда промените от друга B само ако те са се случили преди определено действие на A
- Релацията се случва преди се определя много строго по следната дефиниция:
- Запис във
volatile
променлива се случва преди последващо нейно прочитане (от същата или друга нишка)
- За всеки два последователни statement-а в една нишка, първият се случва преди вторият
- Релацията е транзитивно затворена
- Образува се частична наредба между нишките
Видимост между нишки –
happens-before релация
- Всички други средства за конкурентност на Java (lock-ове, AtomicReference, конкурентни колекции и др.) използват вътрешно volatile
- Следователно дефиницията от предния слайд важи и за тях
Видимост между нишки
- Допълнително JVM ни гарантира:
- всяка референция към immutable обект сочи към обект с напълно валидно състояние
- неизменимостта премахва огромен клас от възможни грешки
Проблеми на нишките
- Тежки – всяка има стек, регистри, превключването е бавно и минава през ядрото на ОС, стартират бавно
- Липсва реактивност
- но могат активно да създадат други нишки
- и да променят споделено състояние
- Комплексни
- комуникират чрез споделено състояние
- или чрез синхронни и блокиращи операции
- нишките са няколко императивни програми, изпълняващи се конкурентно
- всяка със собствена времева линия, но недетерминистично преплетени на моменти
- Не се композират
- The Problem with Threads
Thread pools
- вместо да стартираме и спираме нишки, ще опитаме да ги преизползваме
- ще разделим работата на задачи – можем да използваме
Runnable
интерфейса за тях
- задачите се изпращат към pool от нишки, готови да ги изпълнят, като ако няма свободна – изчакват
- всяка задача може да добави нови задачи по време на своето изпълнение
Callbacks
def doSomething(onComplete: Result => Unit): Unit
- Задействат се при завършване на работа или при определено събитие (естествени за event loop)
- Асинхронни и реактивни
- Ще ги изпълним върху pool от нишки (брой = ~брой ядра)
def doSomethingDangerous(onComplete: Try[Result] => Unit): Unit
Callbacks – негативи
- Императивни, работят с mutable state
- Некомпозитни. Callback hell
- Ако се изпълняват в различни нишки, изискват синхронизация
- Ръчно спряване с грешки
Future
Какво бяхме постигнали с IO
?
- Страничен ефект => функционален ефект
- Защо това ни харесва?
- Функционално композиране на ефекти (по определена операция)
- Странични ефекти само при изпълнение
Защо точно тези операции?
При императивното всяка нишка описва собствена времева линия, преплитаща се с другите
Функционално програмиране
![]()
Изрази
val a = 10
val b = 40
val c = a * 10
val d = (a, b)
val e = a + b
def f(x: Int): String = ???
def g(x: String): Boolean = ???
def r = g(f(a))
Ефектни изрази
val a = Future(10)
val b = Future(40)
val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)
def f(n: Int): Future[String] = ???
def g(str: String): Future[Boolean] = ???
def r = a.flatMap(f).flatMap(g)
Ефектни изрази
val a = Future(10)
val b = Future(40)
val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)
def f(n: Int): Future[String] = ???
def g(n: Int, str: String): Future[Boolean] = ???
def r = for
n <- a
str <- f(n)
bool <- g(n, str)
yield bool
Имплементация. Eager или lazy?
- Ще разгледаме и двата варианта
- Стандартния Scala Future работи eagerly
- т.е. изчислението започва веднага след дефинирането на Future-а, стига да станат готови всички зависимости
- композицията на ефектите остава функционално
- Следващия път ще разгледаме lazy вариант, който ще наречем асинхронен IO
- започването на изчислението се отлага докато не извикаме напр.
unsafeRunAsync
Реактивност на Future
- Ще се базираме на callback –
onComplete
- Функционалните трансформации – чрез
onComplete
- Трансформиран
Future
също е реактивен
- Как да го вържем с външни източници на събития?
- Promise
- Promise-ите ще генерират първоначални Future-и в нашата система
- Адаптер към външния свят – реагират на събития за вход/изход
Immutability
Future е безопасен, само ако стойностите в него са immutable!
Ако не са, то тяхното състояние може да е неизвестно
Асинхронен вход/изход чрез Future
Future
в Scala
scala.concurrent.Future
- Използва
ExecutionContext
вместо Executor
- default:
import scala.concurrent.ExecutionContext.Implicits.global
От асинхронност към блокиране?
Await.result(future: Future[A], atMost: Duration): A
- Използвайте рядко, например само при краищата на програмата или в тестове
- В Scala считаме блокирането на нишки за скъпо и предпочитаме асинхронност
Предния път
- Трансформиращи, интерактивни и реактивни системи
- Конкурентност и паралелизъм
- Дистрибутираност
- Проблеми с нишките. Работа с нишки. Видимост между нишки
- Преизползване на нишки чрез thread pools
- Реактивност и асинхронност чрез calllbacks
- ФП подход – композитност и безопасност чрез асинхроннен Future
- Защо
map
, flatMap
, zip
? 🧙
- Връзка към външния свят. Promise. Пример с адаптиране на HTTP библиотека
Съществуваща стойност към Future
Future.successful(value)
Future.failed(exception)
Полезни функции
recover
recoverWith
fallbackTo
Future.sequence
Future.firstCompletedOf
Recover
def doSomethingForUser(userId: Id): Future[HttpResponse] =(for
user <- retrieveUser(userId)
result <- doService(user, input)
yield Ok(constructBody(result))) recover {
case UserNotFound(userId) => NotFound
case _: ActionNotPermitted => Unauthorized
}
Recover with друга, по-стабилна алтернатива
def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???
def calculate(input: ServiceInput): Future[Result] =
dangerousService(input).recoverWith {
case _: ServiceFailure => safeService(input)
}
Fallback to
def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???
def calculate(input: ServiceInput): Future[Result] =
dangerousService(input).fallbackTo(safeService(input))
За разлика от предишния вариант, тук safeService
се изчислява едновременно със dangerousService
Състезание
def algorithm1(input: String): Future[Int] = ???
def algorithm2(input: String): Future[Int] = ???
def calculation(input: Stirng): Future[Int] =
Future.firstCompletedOf(List(algorithm1(input), algorithm2(input)))
Неопределен брой независими изчисления
def retrieveAge(userId: Id): Future[Int] = ???
def averageFriendsAge(user: User): Future[Option[Int]] = for
friendIds: List[Id] <- user.friends
friendsAges: List[Future[Int]] = friendIds.map(retrieveAge)
ages <- Future.sequence(friendsAges): Future[List[Int]]
yield average(ages)
Референтна прозрачност
def calc[T](expr: => T) = Future {
Thread.sleep(4000)
expr
}
val futureA = calc(42)
val futureB = calc(10)
val sum = for
a <- futureA
b <- futureB
yield a + b
println {
Await.result(sum, 5.seconds)
}
Референтна прозрачност
def calc[T](expr: => T) = Future {
Thread.sleep(4000)
expr
}
val sum = for
a <- calc(42)
b <- calc(10)
yield a + b
println {
Await.result(sum, 5.seconds)
}
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
Правилният начин да постигнем паралелилизъм:
val sum = for
(a, b) <- calc(42).zip(calc(10))
yield a + b
println {
Await.result(sum, 5.seconds)
}
> 52
Референтна прозрачност (пример 2)
def calc[T](expr: => T) = Future {
println("Hello")
Thread.sleep(4000)
expr
}
val futureCalc = calc(42)
val sum = for
(a, b) <- futureCalc zip futureCalc
yield a + b
println {
Await.result(sum, 5.seconds)
}
Референтна прозрачност (пример 2)
def calc[T](expr: => T) = Future {
println("Hello")
Thread.sleep(4000)
expr
}
val sum = for
(a, b) <- calc(42) zip calc(42)
yield a + b
println {
Await.result(sum, 5.seconds)
}
Как да се справим с това?
Докъде сме?
Следващия път ще си поговорим повече за абстракции