Конкурентност

Досега

  • Програми с мощността на ламбда смятането/машината на Тюринг
  • Последователни изчисления, не се влияят от времето
  • Нямат връзка с околния свят
  • вход => предвидима трансформация => изход
  • Трансформиращи програми
  • Добре изучени, “лесни”

IO

import Console._

val program = for {
  _     <- putStrLn("What is your name?")
  name  <- getStrLn
  _     <- putStrLn("Hello, " + name + ", welcome!")
} yield ()

program.unsafeRun()

doSomethingElse()
  • Връзка с външния свят
  • Но синхронна – програмата не прави нищо друго докато чака
  • Интерактивни програми

Реалният свят

  • Светът навън е силно паралелен и конкурентен
  • Нещо повече, участниците в него си взаимодействат
  • Развива се във времето
  • Как да моделираме такива програми?

Конкурентност и паралелизъм

parallel
from παρά + ἄλληλος, along each other
concurrent
present active participle of concurrō (“happen at the same time with”), from con- (“with”) + currō (“run”)
concurrent computing
a form of computing in which several computations are executed during overlapping time periods—concurrently—instead of sequentially

Конкурентност

В изчислителен контекст:

  • конкурентността се отнася към структурата на програмата,
  • паралелизмът, дистрибутираността – към хардуера и как тя ще бъде изпълнявана.

Конкурентните програми са композитност от unit-и от изчисления, които, веднъж дефинирани, могат да бъдат изпълнени независимо едно от друго.

Дистрибутираност

Независими компоненти със собствени изчислителни ресурси и памет, които комуникират помежду си чрез съобщения и изграждат цялостна система

“A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.”

Реактивност

Свойството на програмите/компонентите да реагират
на света около тях (с което да са част от него)

Конкурентни модели

  • От ниско ниво
    • Нишки
    • I/O и TCP/IP конкурентност
  • Абстракции
    • Callbacks & event loop
    • Future/Promise & IO
    • Актьорски модел
    • Communicating Sequential Processes
    • Stream/dataflow конкурентност
    • …нещо друго?

Какво би направило един модел подходящ?

  • Ease to reason about it
  • Expressive
  • Safe
  • Doesn’t hide possibility for errors (e.g. in a distributed setting)
  • Composable
  • Functional programming to the rescue

Нишки

  • Кой е ползвал НИШКИ?

Как си комуникират нишките?

…чрез споделено състояние!

  • За да бъде смислен, всеки конкурентен примитив е нужно да има поне една интеракция с околния свят или с други примитиви

  • Границите с дистрибутираните системи се размиват

  • Нишки – чрез споделена памет и средства на процесора и ОС

  • Модел на синхронизация:

Видимост между нишки

Видимост чрез volatile

Видимост между нишки –
happens-before релация

  • Една нишка A вижда промените от друга B само ако те са се случили преди определено действие на A
  • Релацията се случва преди се определя много строго по следната дефиниция:
    • Запис във volatile променлива се случва преди последващо нейно прочитане (от същата или друга нишка)
    • За всеки два последователни statement-а в една нишка, първият се случва преди вторият
    • Релацията е транзитивно затворена
  • Образува се частична наредба между нишките

Видимост между нишки –
happens-before релация

  • Всички други средства за конкурентност на Java (lock-ове, AtomicReference, конкурентни колекции и др.) използват вътрешно volatile
  • Следователно дефиницията от предния слайд важи и за тях

Видимост между нишки

  • Допълнително JVM ни гарантира:
    • всяка референция към immutable обект сочи към обект с напълно валидно състояние
  • неизменимостта премахва огромен клас от възможни грешки

Проблеми на нишките

  • Тежки – всяка има стек, регистри, превключването е бавно и минава през ядрото на ОС, стартират бавно
  • Липсва реактивност
    • но могат активно да създадат други нишки
    • и да променят споделено състояние
  • Комуникират
    • чрез споделено състояние
    • чрез синхронни и блокиращи операции – докато операцията не завърши нишката не може да прави нищо друго
  • Не се композират
  • The Problem with Threads

Callbacks

def doSomething(onComplete: Result => Unit): Unit
  • Асинхронни и реактивни
  • Задействат се при завършване на работа или при определено събитие (естествени за event loop)
  • Ще ги изпълним върху pool от нишки (брой = ~брой ядра)
def doSomethingDangerous(onComplete: Try[Result] => Unit): Unit

Callbacks – негативи

  • Императивни, работят с mutable state
  • Некомпозитни. Callback hell
  • Ако се изпълняват в различни нишки, изискват синхронизация
  • Ръчно спряване с грешки

Future

Какво бяхме постигнали с IO?

  • Страничен ефект => функционален ефект
  • Защо това ни харесва?
  • Функционално композиране на ефекти (по определена операция)
  • Странични ефекти само при изпълнение

Как би изглеждал Future

Защо точно тези операции?

При императивното всяка нишка описва собствена времева линия, преплитаща се с другите

Функционално програмиране

Изрази

val a = 10
val b = 40

val c = a * 10
val d = (a, b)
val e = a + b

def f(x: Int): String = ???
def g(x: String): Boolean = ???

def r = g(f(a))

Ефектни изрази

val a = Future(10)
val b = Future(40)

val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)

def f(n: Int): Future[String] = ???
def g(str: String): Future[Boolean] = ???

def r = a.flatMap(f).flatMap(g)

Ефектни изрази

val a = Future(10)
val b = Future(40)

val c = a.map(_ * 10)
val d = a zip b
val e = a.zipMap(b)(_ + _)

def f(n: Int): Future[String] = ???
def g(n: Int, str: String): Future[Boolean] = ???

def r = for {
  n <- a
  str <- f(n)
  bool <- g(n, str)
} yield bool

Имплементация. Eager или lazy?

  • Ще разгледаме и двата варианта
  • Стандартния Scala Future работи eagerly
  • Future се изчислява асинхронно => допустимо е да се изчисляват eagerly
  • т.е. изчислението да започне веднага след дефинирането и когато станат готови всички зависимости
  • композицията на ефектите остава функционално

Имплементация

Реактивност на Future

  • Ще се базираме на callback – onComplete
  • Функционалните трансформации – чрез onComplete
  • Трансформиран Future също е реактивен
  • Как да го вържем с външни източници на събития?
  • Promise
    • Promise-ите ще генерират първоначални Future-и в нашата система
    • Адаптер към външния свят – реагират на събития за вход/изход

Примери

Immutability

Future е безопасен, само ако стойностите в него са immutable!

Ако не са, то тяхното състояние може да е неизвестно

Асинхронен вход/изход чрез Future

Как работи HTTP?

Предния път

  • Трансформиращи, интерактивни и реактивни системи
  • Конкурентност и паралелизъм
  • Дистрибутираност
  • Проблеми с нишките. Работа с нишки. Видимост между нишки
  • Реактивност и асинхронност чрез calllbacks
  • Преизползване на нишки чрез thread pools
  • ФП подход – композитност и безопасност чрез асинхроннен Future
  • Защо map, flatMap, zip? 🧙‍
  • Връзка към външния свят. Promise. Пример с адаптиране на HTTP библиотека

Future в Scala

  • scala.concurrent.Future
  • Използва ExecutionContext вместо Executor
  • default: import scala.concurrent.ExecutionContext.Implicits.global

От асинхронност към блокиране?

  • Await.result(future: Future[A], atMost: Duration): A
  • Използвайте рядко, например само при краищата на програмата или в тестове
  • В Scala считаме блокирането на нишки за скъпо и предпочитаме асинхронност

Съществуваща стойност към Future

Future.successful(value)
Future.failed(exception)

Web библиотека (демо)

Полезни функции

  • recover
  • recoverWith
  • fallbackTo
  • Future.sequence
  • Future.firstCompletedOf

Recover

def doSomethingForUser(userId: Id): Future[HttpResponse] =(for {
  user <- retrieveUser(userId)
  result <- doService(user, input)
} yield Ok(constructBody(result))) recover {
  case UserNotFound(userId) => NotFound
  case _: ActionNotPermitted => Unauthorized
}

Recover with друга, по-стабилна алтернатива

def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???

def calculate(input: ServiceInput): Future[Result] = {
  dangerousService(input).recoverWith {
    case _: ServiceFailure => safeService(input)
  }
}

Fallback to

def dangerousService(input: ServiceInput): Future[Result] = ???
def safeService(input: ServiceInput): Future[Result] = ???

def calculate(input: ServiceInput): Future[Result] = {
  dangerousService(input).fallbackTo(safeService(input))
}

За разлика от предишния вариант, тук safeService се изчислява едновременно със dangerousService

Състезание

def algorithm1(input: String): Future[Int] = ???
def algorithm2(input: String): Future[Int] = ???

def calculation(input: Stirng): Future[Int] =
  Future.firstCompletedOf(List(algorithm1(input), algorithm2(input)))

Неопределен брой независими изчисления

def retrieveAge(userId: Id): Future[Int] = ???

def averageFriendsAge(user: User): Future[Option[Int]] = for {
  friendIds: List[Id] <- user.friends
  friendsAges: List[Future[Int]] = friendIds.map(retrieveAge)
  ages <- Future.sequence(friendsAges): Future[List[Int]]
} yield average(ages)

Референтна прозрачност

def calc[T](expr: => T) = Future {
  Thread.sleep(4000)

  expr
}
val futureA = calc(42)
val futureB = calc(10)

val sum = for {
  a <- futureA
  b <- futureB
} yield a + b

println {
  Await.result(sum, 5.seconds)
}
> 52

Референтна прозрачност

def calc[T](expr: => T) = Future {
  Thread.sleep(4000)

  expr
}
val sum = for {
  a <- calc(42)
  b <- calc(10)
} yield a + b

println {
  Await.result(sum, 5.seconds)
}
> Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]

Правилният начин да постигнем паралелилизъм:

val sum = for {
  (a, b) <- calc(42).zip(calc(10))
} yield a + b

println {
  Await.result(sum, 5.seconds)
}
> 52

Как да се справим с това?

Lazy Future/IO (демо) 😊

Въпроси :)?

Докъде сме?

Следващия път ще си поговорим повече за абстракции

// reveal.js plugins