Конкурентност

Досега

  • Програми с мощността на ламбда смятането/машината на Тюринг
  • Последователни изчисления, не се влияят от времето
  • Нямат връзка с околния свят
  • вход => предвидима трансформация => изход
  • Трансформиращи програми
  • Добре изучени, “лесни”

IO

import Console._

val program = for
  _     <- putStrLn("What is your name?")
  name  <- getStrLn
  _     <- putStrLn("Hello, " + name + ", welcome!")
yield ()

program.unsafeRun()

doSomethingElse()
  • Връзка с външния свят
  • Но синхронна – програмата не прави нищо друго докато чака
  • Интерактивни програми

Реалният свят

  • Светът навън е силно паралелен и конкурентен
  • Нещо повече, участниците в него си взаимодействат
  • Развива се във времето
  • Как да моделираме такива програми?

Конкурентност и паралелизъм

parallel
from παρά + ἄλληλος, along each other
concurrent
present active participle of concurrō (“happen at the same time with”), from con- (“with”) + currō (“run”)
concurrent computing
a form of computing in which several computations are executed during overlapping time periods—concurrently—instead of sequentially

Конкурентност и паралелизъм

  • конкурентност – когато няколко задачи имат възможност да прогресират в рамките на един и същи период от време, но не задължително едновременно
    • пример: multitasking позволява различни нишки, споделящи си едно процесорно ядро, да прогресират заедно, редувайки се коя ще използва ядрото
  • паралелизъм – когато конкурентни задачи биват изпълнявани едновременно (по едно и също време)
    • пример: наличието на няколко ядра би ни позволило няколко нишки да прогресират едновременно

Конкурентност

В изчислителен контекст:

  • конкурентността се отнася към структурата на програмата,
  • паралелизмът – към хардуера и как тя ще бъде изпълнявана.

Конкурентните програми са композитност от unit-и от изчисления, които, веднъж дефинирани, могат да бъдат изпълнени независимо едно от друго.

Дистрибутираност

Независими компоненти със собствени изчислителни ресурси и памет, които комуникират помежду си чрез съобщения и изграждат цялостна система

(обичайно) работят конкурентно и паралелно един спрямо друг

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.”

Реактивност

Свойството на програмите/компонентите да реагират
на света около тях (с което да са част от него)

Конкурентни модели

  • От ниско ниво
    • Нишки
    • I/O и TCP/IP конкурентност
  • Абстракции
    • Callbacks & event loop
    • Future/Promise & IO
    • Актьорски модел
    • Communicating Sequential Processes
    • Stream/dataflow конкурентност
    • …нещо друго?

Какво би направило един модел подходящ?

  • Ease to reason about it
  • Expressive
  • Safe
  • Doesn’t hide possibility for errors (e.g. in a distributed setting)
  • Composable
  • Functional programming to the rescue

Нишки

  • Кой е ползвал НИШКИ?

Как си комуникират нишките?

…чрез споделено състояние!

  • За да бъде смислен, всеки конкурентен примитив е нужно да има поне една интеракция с околния свят или с други примитиви

  • Границите с дистрибутираните системи се размиват

  • Нишки – чрез споделена памет и средства на процесора и ОС

  • Модел на синхронизация:

Видимост между нишки

Видимост чрез volatile

Видимост между нишки –
happens-before релация

  • Една нишка A вижда промените от друга B само ако те са се случили преди определено действие на A
  • Релацията се случва преди се определя много строго по следната дефиниция:
    • Запис във volatile променлива се случва преди последващо нейно прочитане (от същата или друга нишка)
    • За всеки два последователни statement-а в една нишка, първият се случва преди вторият
    • Релацията е транзитивно затворена
  • Образува се частична наредба между нишките

Видимост между нишки –
happens-before релация

  • Всички други средства за конкурентност на Java (lock-ове, AtomicReference, конкурентни колекции и др.) използват вътрешно volatile
  • Следователно дефиницията от предния слайд важи и за тях

Видимост между нишки

  • Допълнително JVM ни гарантира:
    • всяка референция към immutable обект сочи към обект с напълно валидно състояние
  • неизменимостта премахва огромен клас от възможни грешки

Проблеми на нишките

  • Тежки – всяка има стек, регистри, превключването е бавно и минава през ядрото на ОС, стартират бавно
  • Липсва реактивност
    • но могат активно да създадат други нишки
    • и да променят споделено състояние
  • Комплексни
    • комуникират чрез споделено състояние
    • или чрез синхронни и блокиращи операции
    • нишките са няколко императивни програми, изпълняващи се конкурентно
    • всяка със собствена времева линия, но недетерминистично преплетени на моменти
  • Не се композират
  • The Problem with Threads

Как да решим проблемите?

Thread pools

  • вместо да стартираме и спираме нишки, ще опитаме да ги преизползваме
  • ще разделим работата на задачи – можем да използваме Runnable интерфейса за тях
  • задачите се изпращат към pool от нишки, готови да ги изпълнят, като ако няма свободна – изчакват
  • всяка задача може да добави нови задачи по време на своето изпълнение

Callbacks

def doSomething(onComplete: Result => Unit): Unit
  • Задействат се при завършване на работа или при определено събитие (естествени за event loop)
  • Асинхронни и реактивни
  • Ще ги изпълним върху pool от нишки (брой = ~брой ядра)
def doSomethingDangerous(onComplete: Try[Result] => Unit): Unit

Callbacks – негативи

  • Императивни, работят с mutable state
  • Некомпозитни. Callback hell
  • Ако се изпълняват в различни нишки, изискват синхронизация
  • Ръчно спряване с грешки

Future

Какво бяхме постигнали с IO?

  • Страничен ефект => функционален ефект
  • Защо това ни харесва?
  • Функционално композиране на ефекти (по определена операция)
  • Странични ефекти само при изпълнение

Как би изглеждал Future

Защо точно тези операции?

При императивното всяка нишка описва собствена времева линия, преплитаща се с другите

Функционално програмиране

Изрази

val a = 10
val b = 40

val c = a * 10
val d = (a, b)
val e = a + b

def f(x: Int): String = ???
def g(x: String): Boolean = ???

def r = g(f(a))

Ефектни изрази

val a = Future(10)
val b = Future(40)

val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)

def f(n: Int): Future[String] = ???
def g(str: String): Future[Boolean] = ???

def r = a.flatMap(f).flatMap(g)

Ефектни изрази

val a = Future(10)
val b = Future(40)

val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)

def f(n: Int): Future[String] = ???
def g(n: Int, str: String): Future[Boolean] = ???

def r = for
  n <- a
  str <- f(n)
  bool <- g(n, str)
yield bool

Имплементация. Eager или lazy?

  • Ще разгледаме и двата варианта
  • Стандартния Scala Future работи eagerly
    • т.е. изчислението започва веднага след дефинирането на Future-а, стига да станат готови всички зависимости
    • композицията на ефектите остава функционално
  • Следващия път ще разгледаме lazy вариант, който ще наречем асинхронен IO
    • започването на изчислението се отлага докато не извикаме напр. unsafeRunAsync

Имплементация

Реактивност на Future

  • Ще се базираме на callback – onComplete
  • Функционалните трансформации – чрез onComplete
  • Трансформиран Future също е реактивен
  • Как да го вържем с външни източници на събития?
  • Promise
    • Promise-ите ще генерират първоначални Future-и в нашата система
    • Адаптер към външния свят – реагират на събития за вход/изход

Примери

Immutability

Future е безопасен, само ако стойностите в него са immutable!

Ако не са, то тяхното състояние може да е неизвестно

Асинхронен вход/изход чрез Future

Как работи HTTP?

Future в Scala

  • scala.concurrent.Future
  • Използва ExecutionContext вместо Executor
  • default: import scala.concurrent.ExecutionContext.Implicits.global

От асинхронност към блокиране?

  • Await.result(future: Future[A], atMost: Duration): A
  • Използвайте рядко, например само при краищата на програмата или в тестове
  • В Scala считаме блокирането на нишки за скъпо и предпочитаме асинхронност

Предния път

  • Трансформиращи, интерактивни и реактивни системи
  • Конкурентност и паралелизъм
  • Дистрибутираност
  • Проблеми с нишките. Работа с нишки. Видимост между нишки
  • Преизползване на нишки чрез thread pools
  • Реактивност и асинхронност чрез calllbacks
  • ФП подход – композитност и безопасност чрез асинхроннен Future
  • Защо map, flatMap, zip? 🧙‍
  • Връзка към външния свят. Promise. Пример с адаптиране на HTTP библиотека

Съществуваща стойност към Future

Future.successful(value)
Future.failed(exception)

Web библиотека (демо)

Полезни функции

  • recover
  • recoverWith
  • fallbackTo
  • Future.sequence
  • Future.firstCompletedOf

Recover

def doSomethingForUser(userId: Id): Future[HttpResponse] =(for
  user <- retrieveUser(userId)
  result <- doService(user, input)
yield Ok(constructBody(result))) recover {
  case UserNotFound(userId) => NotFound
  case _: ActionNotPermitted => Unauthorized
}

Recover with друга, по-стабилна алтернатива

def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???

def calculate(input: ServiceInput): Future[Result] =
  dangerousService(input).recoverWith {
    case _: ServiceFailure => safeService(input)
  }

Fallback to

def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???

def calculate(input: ServiceInput): Future[Result] =
  dangerousService(input).fallbackTo(safeService(input))

За разлика от предишния вариант, тук safeService се изчислява едновременно със dangerousService

Състезание

def algorithm1(input: String): Future[Int] = ???
def algorithm2(input: String): Future[Int] = ???

def calculation(input: Stirng): Future[Int] =
  Future.firstCompletedOf(List(algorithm1(input), algorithm2(input)))

Неопределен брой независими изчисления

def retrieveAge(userId: Id): Future[Int] = ???

def averageFriendsAge(user: User): Future[Option[Int]] = for
  friendIds: List[Id] <- user.friends
  friendsAges: List[Future[Int]] = friendIds.map(retrieveAge)
  ages <- Future.sequence(friendsAges): Future[List[Int]]
yield average(ages)

Референтна прозрачност

def calc[T](expr: => T) = Future {
  Thread.sleep(4000)

  expr
}
val futureA = calc(42)
val futureB = calc(10)

val sum = for
  a <- futureA
  b <- futureB
yield a + b

println {
  Await.result(sum, 5.seconds)
}
> 52

Референтна прозрачност

def calc[T](expr: => T) = Future {
  Thread.sleep(4000)

  expr
}



val sum = for
  a <- calc(42)
  b <- calc(10)
yield a + b

println {
  Await.result(sum, 5.seconds)
}
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]

Правилният начин да постигнем паралелилизъм:

val sum = for
  (a, b) <- calc(42).zip(calc(10))
yield a + b

println {
  Await.result(sum, 5.seconds)
}
> 52

Референтна прозрачност (пример 2)

def calc[T](expr: => T) = Future {
  println("Hello")
  
  Thread.sleep(4000)

  expr
}
val futureCalc = calc(42)

val sum = for
  (a, b) <- futureCalc zip futureCalc
yield a + b

println {
  Await.result(sum, 5.seconds)
}
Hello
> 52

Референтна прозрачност (пример 2)

def calc[T](expr: => T) = Future {
  println("Hello")
  
  Thread.sleep(4000)

  expr
}


val sum = for
  (a, b) <- calc(42) zip calc(42)
yield a + b

println {
  Await.result(sum, 5.seconds)
}
Hello
Hello
> 52

Как да се справим с това?

Lazy Future/IO (демо) 😊

Въпроси :)?

Докъде сме?

Следващия път ще си поговорим повече за абстракции