I'm sending this to warm up to writing the concurrency chapter(s?), so that I'll be allowed to answer MarcS' questions. ;) The following is an edited version of e/src/test/esrc/race.updoc. This file was first written with liquid-ink pen and wood-pulp paper on a plane. When I transcribed it literally, except for one bug, it ran the first time. Not something I'd normally expect for concurrency control code!
This is the most E-like E code that's been posted to the list to date.
Given a list of promises, "race" returns a promise for the first of these to be resolved. The race is won by whoever first resolves the returned promise, since promises are resolved only once.
? define race(promises) {
> define [result, resolver] := PromiseMaker()
> define done := false
> for prom in promises {
> prom <- whenResolved(define observer(val) {
> if (!done) {
> resolver forward(val)
> done := true
> }
> })
> }
> result
> }
# value: <race>
In this test, c is the winner of racing a and b:
? define [a,ar] := PromiseMaker()
# value: [Deferred promise, Unresolved Resolver]
? define [b,br] := PromiseMaker()
# value: [Deferred promise, Unresolved Resolver]
? define c := race([a, b])
# value: Deferred promise
? c
# value: Deferred promise
? ar forward(3)
? c
# value: 3
? a
# value: 3
? br forward(4)
? b
# value: 4
? c
# value: 3
A "once" of a function is a use-once version of that function. Ie, "once(func)" returns a object that will forward no more than one "run" message to "func". The two argument form "once(verb, target)" is a generalization which will forward no more than one "verb" message to the target.
This abstraction is intended to be secure, but isn't yet, because all the Miranda methods are forwarded to the target, including "yourself", which the caller can use to obtain the target. This should be simple to fix.
? define once {
> to (verb, target) {
> define used := false
> define forwarder match [verb2, args] {
> if (verb == verb2) {
> if (used) {
> throw "used up"
> }
> used := true
> }
> E call(target, verb2, args)
> }
> }
> to (target) { once("run", target) }
> }
# value: <once>
PrintOnce is a println that gets used up:
? define printOnce := once(println)
# value: org.erights.e.elang.syntax.PrintlnFunc@1c3b1b
? printOnce(3)
3 # value: 3
? printOnce(4)
# problem: org.erights.e.elang.evm.StringException: used up
I've occasionally been asked "E can fork off multiple activities using '<-', but how can it do a join?" In other words, how can an activity be delayed until several others have completed? The best answer is use a joining abstraction like asynchAnd below, and send "whenResolved(delayedActivity)" to the result.
Given a list of promises for booleans, asynchAnd returns a promise for their conjunction. When all resolve to true, the answer resolves to true. When any resolve to false or broken, the answer resolves to false or likewise broken (without waiting for further answers). It uses a "once" in asking each promise for its answer to protect itself from a buggy or malicious alleged boolean who might say "true" multiple times.
? define asynchAnd(bools) {
> define [result, resolver] := PromiseMaker()
> define done := false
> define numUntrue := bools size
> define forward(val) {
> if (!done) {
> resolver forward(val)
> done := true
> }
> }
> define observer(bool) {
> if (E state(bool) == "BROKEN") {
> forward(bool)
> } else if (bool) {
> if ((numUntrue -= 1) <= 0) {
> forward(true)
> }
> } else {
> forward(false)
> }
> }
> for bool in bools {
> bool <- whenResolved(once(observer))
> }
> result
> }
# value: <asynchAnd>
z is the asynchronous conjunction of x and y:
? define [x,xr] := PromiseMaker()
# value: [Deferred promise, Unresolved Resolver]
? define [y,yr] := PromiseMaker()
# value: [Deferred promise, Unresolved Resolver]
? define z := asynchAnd([x,y])
# value: Deferred promise
? z
# value: Deferred promise
? xr forward(true)
? x
# value: true
? z
# value: Deferred promise
? yr forward(false)
? y
# value: false
? z
# value: false
Imports for being able to delay activity
? define TimerMaker := import:org.erights.e.extern.timer.Timer maker
# value: statics of org.erights.e.extern.timer.Timer
? define Timer := TimerMaker theTimer
# value: org.erights.e.extern.timer.Timer@1be8d2
"timeBomb(5000)" returns a promise that will become broken in 5000 milliseconds, ie, 5 seconds.
? define timeBomb(millis) {
> define [result, resolver] := PromiseMaker()
> Timer after(millis, define trigger noticeTimeout {
> try { throw "time's up" } catch ex {
> resolver smash(ex)
> }
> })
> result
> }
# value: <timeBomb>
Put on your safety goggles:
? define bomb := timeBomb(5000)
# value: Deferred promise
? bomb
# value: Deferred promise
Wait 6 seconds (need to say this in a way an automated regression test can obey).
? bomb
# value: Promise broken with org.erights.e.elang.evm.StringException: # time's up
By combining timeBomb with race or a joining construct (like asynchAnd), we have timeouts! For example, Alice can ask Bob for an integer, but be sure to have a resolved answer within around 5 seconds even if Bob is wedged:
define answer := race(bob <- gimmeInteger, timeBomb(5000))
Hope this gives a helpful first taste of E's concurrency control.
Cheers, --MarkM