A presentation made at scalasyd.
It is not just about the cores. The world is asynchronous.
A microcontroller collecting events from sensors around the house.
Develop a minimal, general purpose concurrent programming library that is functional and well typed.
But it is a popular system...
See my "Erlang Style" blog for examples of the problem.
Concurrent | Functional |
---|---|
Asynchronously evolving state variables. | No mutable state. |
Synchronization is an effect. | No side effects. |
Please don't make me go back to while loops :-(
manage the effects and state using a structure built from pure functions
Keep synchronization separate from...
concurrent process definition, separate from...
asynchronous execution, separate from...
error handling.
Lock Free* | Blocking |
---|---|
java.util.concurrent.atomic. AtomicReference |
synchronized(obj); wait(); notify() |
java.util.concurrent. ExecutorService |
java.lang.Thread |
*We will be using this column.
type T
val t0: T
val ref = new AtomicReference[T](t0)
def transact(f: T => T) = {
@tailrec
def attempt(): T = {
val t = ref.get
if( ref.compareAndSet(t, f(t))) t else attempt()
}
attempt()
}
def ff[T:Monoid](t: T): T => T = Monoid.plus(_, t)
// on thread 1
ts1 foreach { t => transact(ff(t)) }
// on thread 2
ts2 foreach { t => transact(ff(t)) }
// later...
val sum = transact(identity)
obj.wait(); // suspend everything after the ";" in a Thread object
theRestOfTheComputation();
// and later on another thread...
obj.notify(); // resume the suspended Thread
As seen in java.util.concurrent.BlockingQueue
val item = blockingQueue.take();
k(item); // suspended until item available
type V
type T = Either[Queue[V => Unit], Queue[V]]
val t0 = Left(Queue())
def take( k: V => Unit): Unit = {
transact {
case Left(ks) => Left(ks :+ k)
case Right(ts) if ! ts.isEmpty => Right(ts.tail)
case _ => Left(Queue(k))
} match {
case Right(ts) if ! ts.isEmpty => k apply ts.head
case _ =>
}
}
// ...
take(k) // k is suspended until a V is available
val someCondition = Ref(false)
new Thread {
public void run() {
// ...
atomic { implicit txn =>
if (! someCondition())
retry // wait for something to change
continueWithConditionSatisfied
}
The retry operator is nice, but programming with Ref
's is not functional.
trait Transaction[T] {
def transition: T => Option[T]
def effect: T => Unit
}
trait Transactor[T] {
def run(r: Transaction[T]): Unit
}
case class State(t: T, rs: List[Transaction[T]])
val cell = new AtomicReference(State(t0, Nil))
def run( r: Transaction[T] ): Unit = {
def attempt: Unit = {
val State(t0, rs) = cell.get
r.transition(t0) match {
case Some(t1) =>
if( ! cell.compareAndSet(State(t1, Nil)))
attempt()
else {
r.effect(t0)
rs.foreach(run)
}
case None =>
if(! cell.compareAndSet(State(t0, r :: rs0)))
attempt()
}
}
First, a convenience method for Transactor:
def transact(pf: PartialFunction[T, T])(k: T => Unit) =
run {
new Transaction[T] {
val transition = pf.lift
val effect = k
}
}
}
val v0: Long
val state = Transactor(v0)
// P or wait
def take( k: Long => Unit): Unit =
state.transact { case v if v > 0 => v -1 } { k }
// V or signal
def offer(i: Long)(k: => Unit): Unit =
state.transact { case v => v + i } { _ => k }
import scala.collection.immutable.Queue
val backlog: Int
val state = Transactor(Queue[T]())
def take( k: T => Unit): Unit =
state.transact {
case q if ! q.isEmpty => q.tail
} {
q => k(q.head)
}
def offer(t: T)(k: => Unit): Unit =
state.transact {
case q if q.length < backlog => q enqueue t
} {
_ => k
}
val initial: Option[T]
val state = Transactor(initial)
def take( k: T => Unit): Unit =
state.transact { case ot @ Some(_) => ot } { _ foreach k }
def offer(ot: Option[T])(k: => Unit): Unit =
state.transact { assign(ot) } { _ => k }
val state = Transactor(0l)
def take( k: Long => Unit): Unit =
state.transact { noChange } { v0 =>
state.transact { case v1 if v1 > v0 => v1 } { k }
}
def offer(u: Unit)(k: => Unit): Unit =
state.transact { case v0 => v0 + 1 } { _ => k }
val state = Transactor(None: Option[T])
def take( k: T => Unit): Unit =
state.transact {
case ot @ Some(_) => ot
} {
_ foreach k
}
def offer(t: T)(k: => Unit): Unit =
state.transact {
case ot @ Some(_) => ot
case None => Some(t)
} {
_ => k
}
trait State
case object Empty extends State
case class Half(t1: T1) extends State
case class Full(t12: (T1,T2)) extends State
private val state = Transactor(Empty: State)
def take(k: ((T1, T2)) => Unit): Unit =
state.transact { case Full(_) => Empty } { case Full(t12) => k(t12) }
def offer(et: Either[T1,T2])(k: => Unit): Unit = et match {
case Left(t1) =>
state.transact { case Empty => Half(t1) } { _ => k }
case Right(t2) =>
state.transact { case Half(t1) => Full((t1, t2)) } { _ => k }
}
import scalaz.concurrent.Future
import Future.async
val input: Channel[String]
val output: Channel[String]
def headLines(n: Int): Future[Nothing] =
async(input.take) flatMap { line =>
if(n == 0) headLines(0)
else
async[Unit]{
k => output.offer(line)(k(()))
} flatMap { _ =>
headLines(n-1)
}
}
Free is the essence of sequential operation aka flatmappyness.
sealed trait Free[V[_], +A]
case class Return[V[_], +A](a: A) extends Free[V,A]
case class Bind[V[_], A, +B](va: V[A], f: A => Free[V,B]) extends Free[V,B]
V[A]
is an instruction to produce an A
.Free[V,B]
is a process that will produce a B
.A => Free[V, B]
is a step in the process.Check out Runar Bjarnason scaladays talk "Compositional Application Architecture With Reasonably Priced Monads".
https://dl.dropboxusercontent.com/u/4588997/ReasonablyPriced.pdf
def flatMap[B](f: A => Free[V,B]): Free[V,B] = this match {
case Return(a) => f(a)
case Bind(vx, g) => Bind(vx, g andThen (_ flatMap f))
}
V[A]
is an "instruction" to produce an A
.A Free[V, B]
is a process
V
B
def foldMap[V[_],W[_]:Monad, A](ma: Free[V,A])(f: V ~> W): W[A] = {
ma match {
case Return(a) => Monad[W].point(a)
case Bind(fx, g) =>
Monad[W].bind(f(fx)) { a =>
foldMap(g(a))(f)
}
}
}
sealed trait Process[+U] extends Process.ProcessOps[U]
// a constant process
case class Complete[U](u: U) extends Process[U]
// one process after another
case class Sequential[V, U]( process: Process[V],
step: V => Process[U]) extends Process[U]
// a trampoline
case class Ready[U]( step: () => Process[U] ) extends Process[U]
// failed state triggers error handling
case class Failed(e: Throwable) extends Process[Nothing]
// naming processes helps diagnostics
case class Named[U](name: String, step: Process[U])
extends Process[U] {
override def toString = s"Process($name)"
}
// states for concurrent processes
case class Waiting[U]( respond: (U => Unit) => Unit)
extends Process[U]
case class Asynchronous[U]( step: () => Process[U] )
extends Process[U]
case class Parallel( p1: Process[Any])
extends Process[Unit]
trait Site {
def success[U](p0: Process[U], u: U): Unit
def failure[U](p0: Process[U], e: Throwable): Unit
def executor: ExecutorService
def run[U](p0: Process[U]): Unit = ...
}
A family of processes is executed at Site
.
def adder(total: Int=0): Process[Int] = receive(inputQ) >>= {
case i if i > limit => send(actionQ, i) >> adder(total)
case i if i < 0 => fail("negative reading")
case i if i == 0 => stop(total)
case i => adder(total + i)
}
adder().run
type Input[A] = Process[A]
type Output[A] = A => Process[Unit]
type Row
type Sum
def foldRows:
Input[Row] => Output[Sum] => Process[Nothing] = ...
def generateRows:
Output[Row] => Process[Nothing] = ...
val backlog: Int
val rows = channel[Row](backlog)
val sums = channel[Sum](backlog)
Site run {
generateRows :-> rows &
rows ->: foldRows :-> sums
}
The &
runs processes in parallel.
The ->:
and :->
apply inputs and outputs to the process definitions.
We went from AtomicReference to Transaction ultimately creating lock free versions of common concurrent programming types.
We borrowed or built a Process monad that runs process steps in an ExecutorService.
Result is a minimal, general purpose concurrent programming library that is functional and well typed.
@a4dev
Core i7 Xeon 5500 Series Data Source Latency (approximate)
/