1 A low level example.
I will show a simple low level example of coroutines, it will be a simple pipeline consisting of a source, a transducer, a limiter, and a sink.
1.1 The source
First we have our source, which just write all the integers starting from 0
down the channel out
. The type %>int
is an output channel endpoint
for int
.
examples/corout/ex1.flx
proc source (out: %>int) () { var i = 0; while true do write (out,i); ++i; done }
1.2 The sink
Now we have a simple sink, which just prints everything it reads:
examples/corout/ex1.flx
proc sink (inp: %<int) () { while true do var i = read inp; println$ i; done }
You should note the extra unit parameter ()
. After the input is bound, the resulting
procedure will have type
unit -> unit
which is the type required to spawn fibre from a coroutine.
1.3 The transducer
Now we have a transducer which writes the square of every integer it reads.
examples/corout/ex1.flx
proc squarer (inp: %<int, out: %>int) () { while true do var i = read inp; write (out, i * i); done }
1.4 The limiter
You should notice the previous three components were all infinite loops. Since we don't want our demo to run forever we will throw in a limiter chip:
examples/corout/ex1.flx
proc limiter (var limit: int) (inp: %<int, out: %>int) () { while limit > 0 do var i = read inp; write (out, i); --limit; done }
This procedure drops through, but it cannot return control, because there is nowhere to return it to, so it has, in effect, committed suicide implicitly.
1.5 The runner
Now we need have four components so we need three channels to connect them:
examples/corout/ex1.flx
proc run_pipeline() { // make channels var i1,o1 = mk_ioschannel_pair[int](); var i2,o2 = mk_ioschannel_pair[int](); var i3,o3 = mk_ioschannel_pair[int](); // make coroutines by binding the channels var d1 = source(o1); var d2 = squarer(i1,o2); var d3 = limiter 8 (i2,o3); var d4 = sink (i3); // spawn coroutines as fibres spawn_fthread d1; spawn_fthread d2; spawn_fthread d3; spawn_fthread d4; // exit } run_pipeline();
Here is the complete program: ex1.flx
2 Using the chip DSSL
The code above involves a bit of boilerplate so I'm going to show you how to do the same job using the chip DSSL. A DSSL is a unique feature of Felix which allows the user to define an extension to the existing language, which has an application in a specific domain. Unlike DSLs or domain specific languages, Felix has DSSLs or domain specific sub-languages.
Let me show you: first our source:
examples/corout/ex2.flx
chip source connector io pin out: %>int { var i = 0; while true do write (io.out,i); ++i; done }
This is the same as before except that the channel name now has the form connector.pin
and so we put io.out
as the channel name. There is no magic here, the connector name
is the name of the parameter, and the pin names are fields of a record, so the parameter
actually has a record type.
Now here is the sink:
examples/corout/ex2.flx
chip sink connector io pin inp: %<int { while true do var i = read io.inp; println$ i; done }
Now here is the transducer and limiter
examples/corout/ex2.flx
chip squarer connector io pin inp: %<int pin out: %>int { while true do var i = read io.inp; write (io.out, i * i); done } chip limiter (var limit: int) connector io pin inp: %<int pin out: %>int { while limit > 0 do var i = read io.inp; write (io.out, i); --limit; done };
Now to connect these things we say this:
examples/corout/ex2.flx
proc run_pipeline1() { device limit8 = limiter 8; circuit connect source.out, squarer.inp connect squarer.out, limit8.inp connect limit8.out, sink.inp endcircuit } run_pipeline1();
The circuit
statement connects all the chips into a circuit and runs it.
The most important feature here is that the chips are anonymous.
Note the connect
clause does not need to use the connector name,
it uses the first connector. This particular feature is implemented in the
compiler itself to allow I/O direction checking and exhaustion checking.
You cannot leave any pin disconnected.
The circuit
feature is very general. For pipelines there is an even easer
syntax:
examples/corout/ex2.flx
proc run_pipeline2() {
(source |-> squarer |-> limiter 8 |-> sink)();
}
run_pipeline2();
For any functional programmers around, a pipeline is semantically identical to a Monad.
2.1 How does it stop?
It is clear, because I told you, that we put the limiter
into the pipeline so it terminates.
So the limiter
commits suicide by dropping off the end after a certain number of data transfers.
But the other three components are infinite loops, so how does the system terminate?
First consider the transducer squarer
tries to write to the limiter, but alas, it is dead?
The write stalls, and we say the fibre is blocked or being vulgar, constipated.
And now, since it is blocked, it cannot read the input from the source
, so it too is blocked.
The sink
on the other hand has a different problem: it is trying to read from the limiter
but the limiter is dead, so there is no input coming: the sink is said to be starved
.
There are therefore no active fibres so the system terminates.
I will emphasise this point heavily because these are the correct termination semantics.
There is another language with processes and channels, namely golang
.
Unfortunately golang
got the termination semantics completely wrong.
in addition, golang
channel semantics are also wrong because channels can be closed
and a reader can detect if a writer closed the channel.
Although this is not possible in Felix, you can write the option type opt
and use
the None
case to signal end of data. In fact, in Felix you can write any
type down a channel, even channels!
3 Asynchronous I/O
Although coroutines are synchronous Felix supports an extension which allows two kinds of asynchronous I/O.
First, the library contains routines which can perform operations on sockets
including read
, write
, bind
, and connect
. That subsystem was used
to construct flx_web
which is the fastest web server in existence and can handle
enormous loads. It outperforms other webservers because it associates a fibre,
rather than a thread, with each connection, so context switching is lightning fast
and occurs in user space.
The socket system also uses advanced socket state notificantion services such
as kqueue
on MacOS and epoll
on Linux.
However we will demonstrate a simpler service, namely the system alarm closk because it has a really simply API.
examples/corout/ex3.flx
println$ "Begin Spawning"; spawn_fthread { println "Start1"; sleep (2.0); println$ "End1"; }; spawn_fthread { println "Start2"; sleep (1.0); println$ "End2"; }; sleep (0.25); println$ "Mainline terminates";
And here's the output:
Begin Spawning Start1 Start2 Mainline terminates End2 End1
The sleep
procedure suspends the coroutine, it does not block
the pthread running the coroutine system. So aftedr the first coroutine
goes to sleep, the mainline continues and spawns the second coroutine.
But then, the mainline itself completes and suicides by fall through.
3.1 Felix is a coroutine
Most programming language generate programs which consist of single top level procedure,
for example main()
in C. It is a subroutine called by the startup code which in turn
was invoked by the operating system.
Felix, as usual, is different. For a start, Felix generates libraries, in fact the
default is a shared library. So in fact, what you think is mainline code running
is in fact just the initialisation code for the library. You can also
add a procedure flx_main()
to your code and that will also be run after
the initialisation of the library.
However there is something else different: the Felix mainline code is not a subroutine, it is a coroutine! The mainline can happily terminate and leave other fibres running: there's really nothing special about your main program code, it's just another coroutine.
The Felix startup code creates a scheduler to run coroutines, and your program code is just the initial coroutine. So your program terminates when there is nothing left to do precisely because it's just another coroutine.
3.2 Running subsystems
There is a constraint on Felix coroutines: they have to be procedures.
Functions cannot read and write channels. However there is a way around
this problem: you can use the run
subroutine to create an run a new,
nested, scheduler.
The run
subroutine, being a subroutine, returns when it is finished.
Unfortunately there is another constraint on nested schedulers: they cannot do asynchronous I/O.
4 The concept of fibration
A coroutine system is a collection of processes called fibres with these properties:
- Exactly one process can be running at once, the others are said to be suspended
- Control cannot be pre-empted, but is yielded voluntarily
- spawn a new fibre
- read from a channel
- write to a channel
- Commit suicide
- Running
- Active (but not running)
- Waiting on read
- Waiting on write
- Dead
A channel is an object associated with which set of fibres all waiting to read or all waiting to write.
4.1 Operations
4.1.1 write
When a write operation is performed on a channel by the running fibre, if there are fibres on the channel which are waiting to read then one is selected, the data to be written is tranfered from the writer to the reader, the fibre is removed from association with the channel, and both the running and reader fibre are made active. Otherwise if there is no readers waiting on the channel, the writer is suspended and put on the channel.
4.1.2 read
When a read operation is performed on a channel by the running fibre, if there are fibres on the channel which are waiting to write then one is selected, the data to be written is transfered from the writer to the reader, fibre, the fibre is removed from association with the channel, and both the running and reader fibre are made active. Otherwise if there is no writers waiting on the channel, the writer is suspended and put on the channel.
4.1.3 suicide
When a fibre commits suicide, it is removed as the running fibre.
4.1.4 spawn
When a new fibre is spawned, both the spawner and spawnee are made active.
4.1.5 Scheduler operation
When there is no running fibre, then if an active fibre exists, an active fibre is selected and promoted to running state and the thread of control begins executing it.
When there is no running fibre, and no active fibres, the whole system terminates. Any fibres waiting on I/O become dead
4.2 Communication Sequential Coroutines
The description above is a complete account of the abstract semantics known as Communicating Sequential Coroutines or CSC, it is a sub system obtain from Tony Hoare's Communicating Sequenmtial Processes or CSP, in which concurrency is replaced by an indeterminate total ordering.
To be precise in CSC all events are totally ordered, however the exact ordering is not determinate. This is because a scheduler running the system is free to pick any active fibre to run when one is needed; similarly a read or write operation can match with any dual operation on a channel.
In the Felix implementation, a spawned fibre runs immediately
if the spawn_fthread
procedure is used to launch it, or the
running fibre continues instead, of schedule_fthread
is used instead.
In addition, I/O transfers always result in the reader proceeding, to give it a chance to actually fetch the data before the write might modify it.
5 No Deadlocks
Most threading systems can deadlock. Here is the classical situation:
examples/corout/ex5.flx
chip D connector io pin inp: %<int pin out: %>int { while true do var x = read io.inp; write (io.out, x); done } device A = D; device B = D; circuit connect A.out, B.inp connect B.out, A.inp endcircuit println$ "Done";
Both devices start by reading, so there's nothing to read because neither can proceed to writing. So the system hangs, right!
Er .. NO! Both chips are suspended, the mainline completes, and the scheduler terminates the program because there is no work to do.
Let me repeat in bold letters:
However they can livelock, but only if you mismanage manual circuit construction. Here's an example:
examples/corout/ex6.fx
var inp,out = mk_ioschannel_pair[int](); spawn_fthread { write (out, 42); };
This is a livelock because the mainline could read
the written data on the channel endpoint inp
but doesn't.