ExpandCollapse

+ 1 A low level example.

I will show a simple low level example of coroutines, it will be a simple pipeline consisting of a source, a transducer, a limiter, and a sink.

+ 1.1 The source

First we have our source, which just write all the integers starting from 0 down the channel out. The type %>int is an output channel endpoint for int.

examples/corout/ex1.flx

  proc source (out: %>int) () {
    var i = 0;
    while true do 
      write (out,i); 
      ++i; 
    done
  }

+ 1.2 The sink

Now we have a simple sink, which just prints everything it reads:

examples/corout/ex1.flx

  proc sink (inp: %<int) () {
    while true do 
      var i = read inp;
      println$ i;
    done
  }

You should note the extra unit parameter (). After the input is bound, the resulting procedure will have type

  unit -> unit

which is the type required to spawn fibre from a coroutine.

+ 1.3 The transducer

Now we have a transducer which writes the square of every integer it reads.

examples/corout/ex1.flx

  proc squarer (inp: %<int, out: %>int) () {
    while true do 
      var i = read inp;
      write (out, i * i);
    done
  }

+ 1.4 The limiter

You should notice the previous three components were all infinite loops. Since we don't want our demo to run forever we will throw in a limiter chip:

examples/corout/ex1.flx

  proc limiter (var limit: int) (inp: %<int, out: %>int) () {
    while limit > 0 do
      var i = read inp;
      write (out, i);
      --limit;
    done
  }

This procedure drops through, but it cannot return control, because there is nowhere to return it to, so it has, in effect, committed suicide implicitly.

+ 1.5 The runner

Now we need have four components so we need three channels to connect them:

examples/corout/ex1.flx

  proc run_pipeline() {
    // make channels
    var i1,o1 = mk_ioschannel_pair[int](); 
    var i2,o2 = mk_ioschannel_pair[int](); 
    var i3,o3 = mk_ioschannel_pair[int](); 
  
    // make coroutines by binding the channels
    var d1 = source(o1);
    var d2 = squarer(i1,o2);
    var d3 = limiter 8 (i2,o3);
    var d4 = sink (i3);
  
    // spawn coroutines as fibres
    spawn_fthread d1;
    spawn_fthread d2;
    spawn_fthread d3;
    spawn_fthread d4;
   
    // exit
  }
  run_pipeline();

Here is the complete program: ex1.flx

+ 2 Using the chip DSSL

The code above involves a bit of boilerplate so I'm going to show you how to do the same job using the chip DSSL. A DSSL is a unique feature of Felix which allows the user to define an extension to the existing language, which has an application in a specific domain. Unlike DSLs or domain specific languages, Felix has DSSLs or domain specific sub-languages.

Let me show you: first our source:

examples/corout/ex2.flx

  chip source
    connector io
     pin out: %>int
  {
    var i = 0;
    while true do 
      write (io.out,i); 
      ++i; 
    done
  }

This is the same as before except that the channel name now has the form connector.pin and so we put io.out as the channel name. There is no magic here, the connector name is the name of the parameter, and the pin names are fields of a record, so the parameter actually has a record type.

Now here is the sink:

examples/corout/ex2.flx

  chip sink 
    connector io
      pin inp: %<int
  {
    while true do 
      var i = read io.inp;
      println$ i;
    done
  }

Now here is the transducer and limiter

examples/corout/ex2.flx

  chip squarer 
    connector io
      pin inp: %<int
      pin out: %>int
  {
    while true do 
      var i = read io.inp;
      write (io.out, i * i);
    done
  }
  chip limiter (var limit: int) 
    connector io
      pin inp: %<int
      pin out: %>int
  {
    while limit > 0 do
      var i = read io.inp;
      write (io.out, i);
      --limit;
    done
  };

Now to connect these things we say this:

examples/corout/ex2.flx

  proc run_pipeline1() {
    device limit8  = limiter 8;
    circuit
      connect source.out, squarer.inp
      connect squarer.out, limit8.inp
      connect limit8.out, sink.inp
    endcircuit
  }
  run_pipeline1();

The circuit statement connects all the chips into a circuit and runs it. The most important feature here is that the chips are anonymous. Note the connect clause does not need to use the connector name, it uses the first connector. This particular feature is implemented in the compiler itself to allow I/O direction checking and exhaustion checking. You cannot leave any pin disconnected.

The circuit feature is very general. For pipelines there is an even easer syntax:

examples/corout/ex2.flx

  proc run_pipeline2() { 
    (source |-> squarer |-> limiter 8 |-> sink)();
  }
  run_pipeline2();

For any functional programmers around, a pipeline is semantically identical to a Monad.

+ 2.1 How does it stop?

It is clear, because I told you, that we put the limiter into the pipeline so it terminates. So the limiter commits suicide by dropping off the end after a certain number of data transfers. But the other three components are infinite loops, so how does the system terminate?

First consider the transducer squarer tries to write to the limiter, but alas, it is dead? The write stalls, and we say the fibre is blocked or being vulgar, constipated. And now, since it is blocked, it cannot read the input from the source, so it too is blocked.

The sink on the other hand has a different problem: it is trying to read from the limiter but the limiter is dead, so there is no input coming: the sink is said to be starved.

There are therefore no active fibres so the system terminates. I will emphasise this point heavily because these are the correct termination semantics. There is another language with processes and channels, namely golang. Unfortunately golang got the termination semantics completely wrong. in addition, golang channel semantics are also wrong because channels can be closed and a reader can detect if a writer closed the channel.

Although this is not possible in Felix, you can write the option type opt and use the None case to signal end of data. In fact, in Felix you can write any type down a channel, even channels!

+ 3 Asynchronous I/O

Although coroutines are synchronous Felix supports an extension which allows two kinds of asynchronous I/O.

First, the library contains routines which can perform operations on sockets including read, write, bind, and connect. That subsystem was used to construct flx_web which is the fastest web server in existence and can handle enormous loads. It outperforms other webservers because it associates a fibre, rather than a thread, with each connection, so context switching is lightning fast and occurs in user space.

The socket system also uses advanced socket state notificantion services such as kqueue on MacOS and epoll on Linux.

However we will demonstrate a simpler service, namely the system alarm closk because it has a really simply API.

examples/corout/ex3.flx

  println$ "Begin Spawning";
  
  spawn_fthread {
    println "Start1"; sleep (2.0); println$ "End1";
  };
  
  spawn_fthread {
    println "Start2"; sleep (1.0); println$ "End2";
  };
  
  sleep (0.25);
  println$ "Mainline terminates";

And here's the output:

Begin Spawning
Start1
Start2
Mainline terminates
End2
End1

The sleep procedure suspends the coroutine, it does not block the pthread running the coroutine system. So aftedr the first coroutine goes to sleep, the mainline continues and spawns the second coroutine. But then, the mainline itself completes and suicides by fall through.

+ 3.1 Felix is a coroutine

Most programming language generate programs which consist of single top level procedure, for example main() in C. It is a subroutine called by the startup code which in turn was invoked by the operating system.

Felix, as usual, is different. For a start, Felix generates libraries, in fact the default is a shared library. So in fact, what you think is mainline code running is in fact just the initialisation code for the library. You can also add a procedure flx_main() to your code and that will also be run after the initialisation of the library.

However there is something else different: the Felix mainline code is not a subroutine, it is a coroutine! The mainline can happily terminate and leave other fibres running: there's really nothing special about your main program code, it's just another coroutine.

The Felix startup code creates a scheduler to run coroutines, and your program code is just the initial coroutine. So your program terminates when there is nothing left to do precisely because it's just another coroutine.

+ 3.2 Running subsystems

There is a constraint on Felix coroutines: they have to be procedures. Functions cannot read and write channels. However there is a way around this problem: you can use the run subroutine to create an run a new, nested, scheduler.

The run subroutine, being a subroutine, returns when it is finished.

Unfortunately there is another constraint on nested schedulers: they cannot do asynchronous I/O.

+ 4 The concept of fibration

A coroutine system is a collection of processes called fibres with these properties:

  • Exactly one process can be running at once, the others are said to be suspended
  • Control cannot be pre-empted, but is yielded voluntarily
In our system, there are four principle operations which yield control:
  1. spawn a new fibre
  2. read from a channel
  3. write to a channel
  4. Commit suicide
A fibre of control can be in one of five states:
  1. Running
  2. Active (but not running)
  3. Waiting on read
  4. Waiting on write
  5. Dead
In our system, all the resources associated by an unreachable dead fibre are automatically reaped by the garbage collector so that in fact the fibre is not technically dead because it no longer exists.

A channel is an object associated with which set of fibres all waiting to read or all waiting to write.

+ 4.1 Operations

+ 4.1.1 write

When a write operation is performed on a channel by the running fibre, if there are fibres on the channel which are waiting to read then one is selected, the data to be written is tranfered from the writer to the reader, the fibre is removed from association with the channel, and both the running and reader fibre are made active. Otherwise if there is no readers waiting on the channel, the writer is suspended and put on the channel.

+ 4.1.2 read

When a read operation is performed on a channel by the running fibre, if there are fibres on the channel which are waiting to write then one is selected, the data to be written is transfered from the writer to the reader, fibre, the fibre is removed from association with the channel, and both the running and reader fibre are made active. Otherwise if there is no writers waiting on the channel, the writer is suspended and put on the channel.

+ 4.1.3 suicide

When a fibre commits suicide, it is removed as the running fibre.

+ 4.1.4 spawn

When a new fibre is spawned, both the spawner and spawnee are made active.

+ 4.1.5 Scheduler operation

When there is no running fibre, then if an active fibre exists, an active fibre is selected and promoted to running state and the thread of control begins executing it.

When there is no running fibre, and no active fibres, the whole system terminates. Any fibres waiting on I/O become dead

+ 4.2 Communication Sequential Coroutines

The description above is a complete account of the abstract semantics known as Communicating Sequential Coroutines or CSC, it is a sub system obtain from Tony Hoare's Communicating Sequenmtial Processes or CSP, in which concurrency is replaced by an indeterminate total ordering.

To be precise in CSC all events are totally ordered, however the exact ordering is not determinate. This is because a scheduler running the system is free to pick any active fibre to run when one is needed; similarly a read or write operation can match with any dual operation on a channel.

In the Felix implementation, a spawned fibre runs immediately if the spawn_fthread procedure is used to launch it, or the running fibre continues instead, of schedule_fthread is used instead.

In addition, I/O transfers always result in the reader proceeding, to give it a chance to actually fetch the data before the write might modify it.

+ 5 No Deadlocks

Most threading systems can deadlock. Here is the classical situation:

examples/corout/ex5.flx

  chip D connector io pin inp: %<int pin out: %>int 
  {
     while true do
       var x = read io.inp;
       write (io.out, x);
     done
  }
  device A = D;
  device B = D;
  circuit 
    connect A.out, B.inp
    connect B.out, A.inp
  endcircuit
  println$ "Done";

Both devices start by reading, so there's nothing to read because neither can proceed to writing. So the system hangs, right!

Er .. NO! Both chips are suspended, the mainline completes, and the scheduler terminates the program because there is no work to do.

Let me repeat in bold letters:

FELIX COROUTINES CANNOT DEADLOCK

However they can livelock, but only if you mismanage manual circuit construction. Here's an example:

examples/corout/ex6.fx

var inp,out = mk_ioschannel_pair[int]();
spawn_fthread { write (out, 42); };

This is a livelock because the mainline could read the written data on the channel endpoint inp but doesn't.