Concurrency Note 1: Some Idioms

Mark S. Miller markm@caplet.com
Tue, 01 Dec 1998 05:39:09 -0800


I'm sending this to warm up to writing the concurrency chapter(s?), so that
I'll be allowed to answer MarcS' questions.  ;)  The following is an edited
version of e/src/test/esrc/race.updoc.  This file was first written with
liquid-ink pen and wood-pulp paper on a plane.  When I transcribed it
literally, except for one bug, it ran the first time.  Not something I'd
normally expect for concurrency control code!

This is the most E-like E code that's been posted to the list to date.  



Given a list of promises, "race" returns a promise for the first of these
to be resolved.  The race is won by whoever first resolves the returned
promise, since promises are resolved only once.

	? define race(promises) {
	>     define [result, resolver] := PromiseMaker()
	>     define done := false
	>     for prom in promises {
	>         prom <- whenResolved(define observer(val) {
	>             if (!done) {
	>                 resolver forward(val)
	>                 done := true
	>             }
	>         })
	>     }
	>     result
	> }
	# value: <race>


In this test, c is the winner of racing a and b:

	? define [a,ar] := PromiseMaker()
	# value: [Deferred promise, Unresolved Resolver]
	
	? define [b,br] := PromiseMaker()
	# value: [Deferred promise, Unresolved Resolver]

	? define c := race([a, b])
	# value: Deferred promise

	? c
	# value: Deferred promise

	? ar forward(3)
	? c
	# value: 3

	? a
	# value: 3

	? br forward(4)
	? b
	# value: 4

	? c
	# value: 3



A "once" of a function is a use-once version of that function.  Ie,
"once(func)" returns a object that will forward no more than one "run"
message to "func".  The two argument form "once(verb, target)" is a
generalization which will forward no more than one "verb" message to the
target.

This abstraction is intended to be secure, but isn't yet, because all the
Miranda methods are forwarded to the target, including "yourself", which
the caller can use to obtain the target.  This should be simple to fix.

	? define once {
	>     to (verb, target) {
	>         define used := false
	>         define forwarder match [verb2, args] {
	>             if (verb == verb2) {
	>                 if (used) {
	>                     throw "used up"
	>                 }
	>                 used := true
	>             }
	>             E call(target, verb2, args)
	>         }
	>     }
	>     to (target) { once("run", target) }
	> }
	# value: <once>

PrintOnce is a println that gets used up:

	? define printOnce := once(println)
	# value: org.erights.e.elang.syntax.PrintlnFunc@1c3b1b
	
	? printOnce(3)
	3
	# value: 3

	? printOnce(4)
	# problem: org.erights.e.elang.evm.StringException: used up

I've occasionally been asked "E can fork off multiple activities using
'<-', but how can it do a join?"  In other words, how can an activity be
delayed until several others have completed?  The best answer is use a
joining abstraction like asynchAnd below, and send
"whenResolved(delayedActivity)" to the result.

Given a list of promises for booleans, asynchAnd returns a promise for
their conjunction.  When all resolve to true, the answer resolves to true.
When any resolve to false or broken, the answer resolves to false or
likewise broken (without waiting for further answers).  It uses a "once" in
asking each promise for its answer to protect itself from a buggy or
malicious alleged boolean who might say "true" multiple times.


	? define asynchAnd(bools) {
	>     define [result, resolver] := PromiseMaker()
	>     define done := false
	>     define numUntrue := bools size
	>     define forward(val) {
	>         if (!done) {
	>             resolver forward(val)
	>             done := true
	>         }
	>     }
	>     define observer(bool) {
	>         if (E state(bool) == "BROKEN") {
	>             forward(bool)
	>         } else if (bool) {
	>             if ((numUntrue -= 1) <= 0) {
	>                 forward(true)
	>             }
	>         } else {
	>             forward(false)
	>         }
	>     }
	>     for bool in bools {
	>         bool <- whenResolved(once(observer))
	>     }
	>     result
	> }
	# value: <asynchAnd>

z is the asynchronous conjunction of x and y:

	? define [x,xr] := PromiseMaker()
	# value: [Deferred promise, Unresolved Resolver]

	? define [y,yr] := PromiseMaker()
	# value: [Deferred promise, Unresolved Resolver]

	? define z := asynchAnd([x,y])
	# value: Deferred promise

	? z
	# value: Deferred promise

	? xr forward(true)
	? x
	# value: true

	? z
	# value: Deferred promise

	? yr forward(false)
	? y
	# value: false

	? z
	# value: false


Imports for being able to delay activity

	? define TimerMaker := import:org.erights.e.extern.timer.Timer maker
	# value: statics of org.erights.e.extern.timer.Timer

	? define Timer := TimerMaker theTimer
	# value: org.erights.e.extern.timer.Timer@1be8d2


"timeBomb(5000)" returns a promise that will become broken in 5000
milliseconds, ie, 5 seconds.

	? define timeBomb(millis) {
	>     define [result, resolver] := PromiseMaker()
	>     Timer after(millis, define trigger noticeTimeout {
	>         try { throw "time's up" } catch ex {
	>             resolver smash(ex)
	>         }
	>     })
	>     result
	> }
	# value: <timeBomb>

Put on your safety goggles:

	? define bomb := timeBomb(5000)
	# value: Deferred promise

	? bomb
	# value: Deferred promise

Wait 6 seconds (need to say this in a way an automated regression test can
obey).

	? bomb
	# value: Promise broken with org.erights.e.elang.evm.StringException: 
	#        time's up


By combining timeBomb with race or a joining construct (like asynchAnd), we
have timeouts!  For example, Alice can ask Bob for an integer, but be sure
to have a resolved answer within around 5 seconds even if Bob is wedged:

	define answer := race(bob <- gimmeInteger, timeBomb(5000))


Hope this gives a helpful first taste of E's concurrency control.

	Cheers,
	--MarkM