[e-lang] Computing Pi in parallel with several VATs. How to write it well ?

Kevin Reid kpreid at attglobal.net
Sat Jul 23 10:29:06 EDT 2005


On Jul 21, 2005, at 12:25, Pascal Grange wrote:
> I am new to E and I am seeking advice about good programming in E. 
> Here is
> the issue :
...
> So that was for the properties of the algorithm. My way of 
> implementing it
> with E is the following : I have several computers (PiComputer), on 
> different
> remote VATs. I submit a job (an interval) to compute asynchronously to 
> each
> computer. When a computer finished its computation, I add its result 
> to the
> current value of Pi being computed and I send the computer a new job. 
> When
> there are no more job to do and no computation is being performed, the
> algorithm stops and displays the value of Pi computed.
...
> This code is too spagheti-like for me but I was not able to do it 
> best. This
> spaghetiness is not a fact, just a feeling, for instance the printing 
> of the
> result is in the middle of the code, not the end...and so on. I know 
> that
> it's asynchronous but I would like to know if a E-programmer would code
> that in a drastically different E-way.
>

This is an interesting problem, and I'm trying to find a nice solution; 
so far, I have an uglier one.

I'm still trying, but I thought you might want to see what I have. The 
structure is different; it has a queue of available computers instead 
of one process for each computer. I now think this is probably a more 
complex way to write it.

This code also propagates failures, avoiding Lost Signal mode.

pragma.enable("easy-return")
pragma.disable("explicit-result-guard")
pragma.enable("easy-when")

def makeLamportSlot := <elib:slot.makeLamportSlot>

def makeComputer() {
   def computer {
     to compute(piece) {
       if (entropy.nextInt(5) == 0) {
         return Ref.broken(`BOOM ($piece)`)
       } else {
         return timer.whenPast(timer.now() + entropy.nextInt(2000),
                               thunk{ "'" <- add(piece) <- add("', ") })
       }
     }
   }
   return computer
}

def nor(value, getter) {
   return if (value == null) { getter() } else { value }
}

def observe(everReporter, simpleReactor) {
   everReporter <- whenUpdated(def fancyReactor {
     to reactToUpdate(newValue, newReporterGen, optNewReporter) {
       optNewReporter <- whenUpdated(fancyReactor, newReporterGen)
       simpleReactor <- ()
     }
   })
}

def makeEventualQueue() {
   def [var queue, var tailResolver] := Ref.promise()
   def whenQueue(elementHandler, endHandler) {
     return \
       when (queue) -> available {
         if (queue =~ [element, rest]) {
           queue := rest
           elementHandler <- (element)
         } else {
           endHandler <- (queue)
         }
       } catch p {
         endHandler <- (Ref.broken(p))
       }
   }

   def push(value) {
     def [rest, resolver] := Ref.promise()
     tailResolver.resolve([value, rest])
     tailResolver := resolver
   }

   return [whenQueue, push]
}

def [whenAvailableComputer, pushAvailableComputer] :=
   makeEventualQueue()

for _ in 1..3 { pushAvailableComputer(makeComputer()) }

def distribute(pieces :Set, seed, traceOut) {

   def piecesLeft := pieces.getElements().diverge()
   def &accumulation := makeLamportSlot(seed)
   var busy := 0
   def result

   def recur() {
     traceOut.println(
       `${piecesLeft.size()} piece(s) left, $busy in progress`)
     if (piecesLeft.size() > 0 && !Ref.isBroken(accumulation)) {
       traceOut.println(`waiting for a computer`)
       whenAvailableComputer(
         def gotAvailable(computer) {
           busy += 1
           when (computer <- compute(piecesLeft.pop())) \
               -> computed(resultPiece) {
             pushAvailableComputer(computer)
             accumulation := accumulation <- add(resultPiece)
           } catch p {
             accumulation := Ref.broken(p)
           } finally {
             busy -= 1
           }
           recur()
         },
         def endAvailable(queueEnd) {
           accumulation := Ref.broken(
                             nor(Ref.optProblem(queueEnd),
                             thunk{"Ran out of computers."}))
           recur()
         })
     } else {
       traceOut.println(`exiting supply loop`)
     }
   }

   observe(&accumulation, def checkFinish() {
     if (!Ref.isResolved(result) &&
         ((busy + piecesLeft.size()) <=> 0 ||
          Ref.isBroken(accumulation))) {
       bind result := accumulation
     }
   })

   recur()
   return result
}

def intervals := ["A", "B", "C", "D", "E", "F", "G"].asSet()

when (distribute(intervals, "", stderr)) -> computed(result) {
   println(`The answer is $result.`)
} catch p {
   println(`Oops, $p: ${p.eStack()}`)
}



More information about the e-lang mailing list