[e-lang] Shortcuts for asynchronous control flow (long)
Plotnikov, Constantine A
cap at isg.axmor.com
Tue Jul 4 11:12:36 EDT 2006
I'm still working on Sebyla and there are ideas on asynchronous control
flow that might be interesting to E. I plan to introduce OCAM-like "seq"
and "par" expressions. I also plan new expressions named "serialized"
and "using" (like one in C#).
These expressions have obvious expansion to "when" expression. However
if there are problems with expansion, I might write about expansion in
follow up messages.
*The "par" expression*
It is a quick way to organize parallel operations. Basic syntax is the
following:
Par ::= "par" "{"
<expression statement> *
"}"
The par construct wait until all promises are resolved and return a
promise for tuple that contains result of all expressions inside. If
some of calls fails the result promise is smashed with a fault that
contains a tuple with partial result (null for failed positions) and a
tuple with faults.
The expressions are evaluated independently and each expression is
implicitly surrounded with {}. And def construct in one statement has no
effect in others.
The expression might one of the following:
1. Scalar expression (int, string, etc). In case of this expression the
par does not waits for anything.
2. Promise. In that case par waits until promise is resolved.
3. Eventual reference. In that case it waits until reference is resolved.
4. Block. This blocks evaluates just like normal block expression in e.
No special rules. It allows to introduce some local scope like {def a =
(x+2*pi); a*a;}
*The "seq" expression*
The expression allows serialize actions where one action should be
executed only after other finishes. The syntax is the following
Seq ::= "seq" "{"
<expression statement> | <wait statement>*
"}"
<wait statement> ::= "wait" <varname> ":=" <expression> ";"
The expression evaluates each expression statement sequentially. And
waits for termination using rules similar to ones from the "par"
expression. The expression evaluates to promise for the value of the
last expression. If one of statements fails, the expression is smashed
with that fault.
Like in "par", defs in expressions could not be seen in other
statements. However there is a "wait" statement that allows to introduce
value visible in other statements.
This construct could be useful to create IO loops. For example
read/write loop could be written as the following:
# the function below forwards data and returns total length when
finishes writing.
def forward(in, out, maxReadDataSize): vow[int] {
def forward(in, out, sum) {
return seq {
wait data := in.read(maxReadDataSize)
# expression below is simple if expression that evaluates to
either int or p
if(data == EOF) {
sum
} else {
seq {
out.write();
forward(in, out, sum + data.length())
}
}
}
}
return forward(in, out, 0)
}
Note that this loop creates a lot of promises for returning result, but
this could be reduced using compiler (see
http://www.eros-os.org/pipermail/e-lang/2004-February/009529.html).
*The "using" expression*
This expression is inspired by C# "using" statement. The syntax is the
following:
Using :: = "using" "(" ("def" <name> ":=")? <expression> ")" "{"
<statement>*
"}"
The expression assumes existence of interface similar to the following:
interface Closable {
to close() : any
}
The expression evaluates to promise for result of the body block. The
expression waits until result of expression is resolved than it checks
if the result expression supports closable interface. If it does not
supports, the expression fails. If it supports, the value is bound to
name, and code block is executed. After value of the block is resolved,
the using construct waits until promise from close operation is
resolved, and than resolves result promise with value returned from body
block.
If body block fails, close operation is executed anyway and expression
fails with exception from body. If body is successful but close fails,
expression fails with exception from close.
Note that closable might be immediate object or eventual reference. In
case of immediate object, close operation can return null meaning that
close operation has finished in this synchronous call. For example,
InputStream wrapper could close input stream synchronously using Java
close call.
For example copy file could be written like the following:
def copyFile(fileFactory, nameIn, nameOut) : vow[int] {
return using(def in := fileFactory.open(nameIn, "r")) {
using(def out := fileFactory.open(nameOut, "w")) {
# function from the "seq" example
forward(in,out)
}
}
}
*The "serialized" expression*
This expression is most tricky one. It allows to serialize requests to
objects that needs it. The problem with asynchronous servers that
request might come before previous is finished to serve.
To solve this problem the following utility object is introduced to
standard library.
def makeRequestQueue() {
# this returns closable that closes current operation and start next
operation
# the close operation returns nil
def makeClosable() : Closable {
...
}
return object {
# this method tries to start operation immediately, if there is
# a pending request it returns null, which means that operation
# cannot be started.
to tryStartRequest() : Closable {
...
}
# start operation when possible.
to startRequest() : vow[Closable] {
...
}
}
}
For implementation of this class see the following Java source that I
have created for asyncobjects framework.
http://svn.sourceforge.net/viewcvs.cgi/asyncobjects/asyncobjects/trunk/src/net.sf.asyncobjects/src/net/sf/asyncobjects/util/RequestQueue.java?view=markup&rev=10
The syntax of expression is the following:
serialized(<expression>) {
<body>
}
The expression is expanded like the following:
{
def rq := <expression>
def cl := rq.tryStartRequest()
if(cl != null) {
using(cl) {
<body>
}
} else {
using(rq.startRequest() {
<body>
}
}
}
The construct can be used like the following (the code also demonstrates
usage of some other expressions):
def makeBufferedStream(in, maxDataSize) : Stream {
def rq := makeRequestQueue()
def data := null
def eofSeen := false
def closed := false
return object {
to ensureOpen() {
if(closed) {throw StreamClosed}
}
to read(clientMaxDataSize) : any{
return serialized(rq) {
ensureOpen()
seq {
ensureHasData()
if(data == null) {
EOF
} else {
if(data.length <= clientMaxDataSize) {
def rc := data
data := null
rc
} else {
def rc := data.prefix(clientDataSize)
data := data.suffix(clientDataSize)
rc
}
}
}
}
}
to enshureHasData() : any {
return if(data == null) {
if(!eofSeen) {
seq {
wait readData := in.read(maxDataSize)
if(readData == EOF) {
eofSeen = true
} else {
data := readData
}
}
} else {
null
}
} else {
null
}
}
to close() : any {
return serialized(rq) {
ensureOpen();
closed := true
in.close();
}
}
}
}
Constanine
More information about the e-lang
mailing list