[e-lang] Shortcuts for asynchronous control flow (long)

Plotnikov, Constantine A cap at isg.axmor.com
Tue Jul 4 11:12:36 EDT 2006


I'm still working on Sebyla and there are ideas on asynchronous control 
flow that might be interesting to E. I plan to introduce OCAM-like "seq" 
and "par" expressions. I also plan new expressions named "serialized" 
and "using" (like one in C#).

These expressions have obvious expansion to "when" expression. However 
if there are problems with expansion, I might write about expansion in 
follow up messages.

*The "par" expression*

It is a quick way to organize parallel operations. Basic syntax is the 
following:

Par ::= "par" "{"
  <expression statement> *
"}"

The par construct wait until all promises are resolved and return a 
promise for tuple that contains result of all expressions inside. If 
some of calls fails the result promise is smashed with a fault that 
contains a tuple with partial result (null for failed positions) and a 
tuple with faults.

The expressions are evaluated independently and each expression is 
implicitly surrounded with {}. And def construct in one statement has no 
effect in others.

The expression might one of the following:
1.  Scalar expression (int, string, etc). In case of this expression the 
par does not waits for anything.
2.  Promise. In that case par waits until promise is resolved.
3.  Eventual reference. In that case it waits until reference is resolved.
4.  Block. This blocks evaluates just like normal block expression in e. 
No special rules. It allows to introduce some local scope like {def a = 
(x+2*pi); a*a;}

*The "seq" expression*

The expression allows serialize actions where one action should be 
executed only after other finishes. The syntax is the following

Seq ::= "seq" "{"
  <expression statement> | <wait statement>*
"}"

<wait statement> ::= "wait" <varname> ":=" <expression> ";"

The expression evaluates each expression statement sequentially. And 
waits for termination using rules similar to ones from the "par" 
expression. The expression evaluates to promise for the value of the 
last expression. If one of statements fails, the expression is smashed 
with that fault.

Like in "par", defs in expressions could not be seen in other 
statements. However there is a "wait" statement that allows to introduce 
value visible in other statements.

This construct could be useful to create IO loops. For example 
read/write loop could be written as the following:
# the function below forwards data and returns total length when 
finishes writing.
def forward(in, out, maxReadDataSize): vow[int] {
  def forward(in, out, sum) {
    return seq {
      wait data := in.read(maxReadDataSize)
      # expression below is simple if expression that evaluates to 
either int or p  
      if(data == EOF) {
        sum
      } else {
        seq {
          out.write();
          forward(in, out, sum + data.length())  
        }
      }
    }
  }
  return forward(in, out, 0)      
}

Note that this loop creates a lot of promises for returning result, but 
this could be reduced using compiler (see 
http://www.eros-os.org/pipermail/e-lang/2004-February/009529.html).

*The "using" expression*

This expression is inspired by C# "using" statement. The syntax is the 
following:

Using :: = "using" "("  ("def" <name> ":=")? <expression> ")" "{"
  <statement>*
"}"

The expression assumes existence of interface similar to the following:

interface Closable {
  to close() : any
}

The expression evaluates to promise for result of the body block. The 
expression waits until result of expression is resolved than it checks 
if the result expression supports closable interface. If it does not 
supports, the expression fails. If it supports, the value is bound to 
name, and code block is executed. After value of the block is resolved, 
the using construct waits until promise from close operation is 
resolved, and than resolves result promise with value returned from body 
block.

If body block fails, close operation is executed anyway and expression 
fails with exception from body. If body is successful but close fails, 
expression fails with exception from close.

Note that closable might be immediate object or eventual reference. In 
case of immediate object, close operation can return null meaning that 
close operation has finished in this synchronous call. For example, 
InputStream wrapper could close input stream synchronously using Java 
close call.

For example copy file could be written like the following:

def copyFile(fileFactory, nameIn, nameOut) : vow[int] {
  return using(def in := fileFactory.open(nameIn, "r")) {
    using(def out := fileFactory.open(nameOut, "w")) {
      # function from the "seq" example
      forward(in,out)
    }
  }
}

*The "serialized" expression*

This expression is most tricky one. It allows to serialize requests to 
objects that needs it. The problem with asynchronous servers that 
request might come before previous is finished to serve.

To solve this problem the following utility object is introduced to 
standard library.

def makeRequestQueue() {
  # this returns closable that closes current operation and start next 
operation
  # the close operation returns nil
  def makeClosable() : Closable {
    ...
  }
  return object {
    # this method tries to start operation immediately, if there is
    # a pending request it returns null, which means that operation
    # cannot be started.
    to tryStartRequest() : Closable {
       ...
    }
    # start operation when possible.
    to startRequest() : vow[Closable] {
       ...
    }
}
}

For implementation of this class see the following Java source that I 
have created for asyncobjects framework.

http://svn.sourceforge.net/viewcvs.cgi/asyncobjects/asyncobjects/trunk/src/net.sf.asyncobjects/src/net/sf/asyncobjects/util/RequestQueue.java?view=markup&rev=10

The syntax of expression is the following:

serialized(<expression>) {
  <body>
}

The expression is expanded like the following:

{
  def rq := <expression>
  def cl := rq.tryStartRequest()
  if(cl != null) {
    using(cl) {
      <body>
    }
  } else {
    using(rq.startRequest() {
      <body>
    }
  }
}

The construct can be used like the following (the code also demonstrates 
usage of some other expressions):

def makeBufferedStream(in, maxDataSize) : Stream {
  def rq := makeRequestQueue()
  def data := null
  def eofSeen := false
  def closed := false
  return object {
    to ensureOpen() {
      if(closed) {throw StreamClosed}
    }
    to read(clientMaxDataSize) : any{
      return serialized(rq) {
        ensureOpen()
        seq {
          ensureHasData()
          if(data == null) {
            EOF
          } else {
            if(data.length <= clientMaxDataSize) {
              def rc := data
              data := null
              rc
            } else {
              def rc := data.prefix(clientDataSize)
              data := data.suffix(clientDataSize)
              rc
            }
          }
        }
      }
    }
    to enshureHasData() : any {
      return if(data == null) {
        if(!eofSeen) {
          seq {
            wait readData := in.read(maxDataSize)
            if(readData == EOF) {
              eofSeen = true
            } else {
              data := readData
            }
          }
        } else {
          null
        }
      } else {
        null
      }
    }
    to close() : any {
      return serialized(rq) {
        ensureOpen();
        closed := true
        in.close();
      }
    }
  }
}

Constanine



More information about the e-lang mailing list