1 Fibres (fthreads)
share/lib/std/control/fibres.flx
Low level management of Felix fthreads (fibres). open class Fibres { private gen _start[t]: (t->0)*t->cont = "$1->clone()->call(0,$2)"; Function to start a continution with argument type t. gen start[t] (p:t->0) (x:t) = { return _start (p,x); } private fun _start0: (1->0)->cont = "$1->clone()->call(0)"; Function to start a contiuation without an argument. gen start (p:1->0) = { return _start0 (p); } gen _continuation_start (p:1->0) = { return _start0 (p); } // compiler looks this up do not change Function to make a fibre out of a continuation. gen mk_thread: cont->fthread = "new(*ptf-> gcp,::flx::rtl::_fthread_ptr_map,false) ::flx::rtl::fthread_t($1,nullptr)"; // Spawn a fibre on this fibres scheduler. // uses a supervisor call so can't be used in a function proc spawn_fthread(p:1->0) { var con = start p; // get continuation of p var fthr = mk_thread con; svc$ svc_spawn_fthread fthr; } proc _svc_fthread (fthr:fthread) { svc$ svc_spawn_fthread fthr; } proc schedule_fthread(p:1->0) { var con = start p; // get continuation of p var fthr = mk_thread con; svc$ svc_schedule_fthread fthr; } proc suicide: 1 = "throw (con_t*)NULL;"; proc chain : cont = "return $1;" requires property "heap_closure"; // ********************************************************* // NESTED SYNC SCHEDULER // NOTE: deprecated in favour of async scheduler below // ********************************************************* The type of a fibre scheduler. type fibre_scheduler = "::flx::run::sync_sched*" requires header ' '; Construct a fibre scheduler. NOTE: NOW GARBAGE COLLECTED! ctor fibre_scheduler: bool = """ new(*ptf-> gcp,::flx::run::sync_sched_ptr_map,false) ::flx::run::sync_sched ( $1, ptf-> gcp, new(*ptf-> gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(ptf-> gcp) ) """ ; ctor fibre_scheduler () => fibre_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "") ; Spawn a fibre on a given scheduler with a given continuation. Note: does NOT run it! FIXME: no mutex guard!! proc spawn_fibre: fibre_scheduler * fthread = """ $1->active->push_back($2); """; proc frun: (1->0) = "::flx::rtl::executil::frun (ptf-> gcp, $1);" requires header ' ' ; proc run: fibre_scheduler = "$1->frun();"; proc run (p: 1 -> 0) { var s = fibre_scheduler(); spawn_fthread s p; s.run; } The type of the stop state of the fibre scheduler. terminated: the scheduler is terminated. blocked: the scheduler is out of threads to run. delegated: the scheduler has been issued a service request by a thread which it cannot satisfy. The scheduler is put in delegated state and awaits for another service to satisfy the request and put it back in operation. //$ Note: there is no "operating" state because the stop state can only be queried by the schedulers caller when the scheduler returns control to it. enum fibre_scheduler_state { terminated, blocked, delegated }; fun get_state : fibre_scheduler -> fibre_scheduler_state = "$1->fs"; Core user procedure for launching a fibre. proc spawn_fthread (fs:fibre_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); } // ********************************************************* // ASYNC SCHEDULER // ********************************************************* // FIXME: it is leaked .. to be fixed shortly // async scheduler type type async_scheduler = "::flx::run::async_sched*" requires header ' ', package "flx_arun" ; // async scheduler constructor ctor async_scheduler: bool = """ new ::flx::run::async_sched ( ptf-> world, // world object $1, // debug driver flag ptf-> gcp, // GC profile object new(*ptf-> gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(ptf-> gcp), ::flx::run::async_sched::mainline // temporary hack! thread kind (should be inherited) ) """ ; // async scheduler constructor wrapper ctor async_scheduler () => async_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "") ; // spawn fibre on async scheduler from fthread object proc spawn_fibre: async_scheduler * fthread = """ $1->ss->active->push_back($2); """; // spawn fibre on async scheduler from procedure proc spawn_fthread (fs:async_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); } proc prun: async_scheduler = "$1->prun();"; proc async_run (p: 1 -> 0) { var s = async_scheduler(); spawn_fthread s p; s.prun; } // ********************************************************* // MISC STUFF THAT MAY NOT BE USED, CONSIDER DELETING IT // UNRELIABLE ANYHOW .. CHECK PLUGINS ... // ********************************************************* Execute a single step of a fibre. gen step: cont -> cont = "$1->resume()"; Schedule death of a fibre. proc kill: fthread = "$1->cc = 0;"; Run a continuation until it terminates. Do not use this proc if the underlying procedure attempts to read messages. This is a low level primitive, bypassing fthreads. proc run: cont = "::flx::rtl::executil::run($1);" requires package "flx_executil"; private proc _send[t]: &cont * t = """ { using namespace ::flx::rtl; con_t *tmp = *(con_t**)$1.get_data(); // run target until it reaches a service request (or death) while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) { try { tmp=tmp->resume(); } catch (con_t *x) { tmp = x; } } // check it is alive and making the expected service request if (!tmp) throw flx_exec_failure_t (__FILE__,"send","Send to terminated procedure"); if (!tmp->p_svc) throw flx_exec_failure_t (__FILE__,"send","Send to unready Procedure"); if (tmp->p_svc->variant != svc_read) throw flx_exec_failure_t (__FILE__,"send","Send to Procedure which is not trying to read"); // store the message **(?1**)tmp->p_svc->data= $2; // clear the service request tmp->p_svc = 0; // run the target until the next service request (or death) while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) { try { tmp=tmp->resume(); } catch (con_t *x) { tmp = x; } } // save the new continuation *(con_t**)$1.get_data() = tmp; } """; Send a message to a continuation. There is no type checking on the message type. The procedure is executed until the next wait_state, then the message is stored. Low level primitive, bypassing fthreads. proc send[t] (p:&cont) (x:t) { _send (p,x); } }
2 Synchronous Channels
share/lib/std/control/schannels.flx
Sychronous Channels. Used to exchange control and possibly data between Felix f-threads (aka fibres). open class Schannel { // SCHANNEL PRIMITIVE _gc_pointer type _schannel = "::flx::rtl::schannel_t*"; // PRIMITIVE CONSTRUCTOR ctor _schannel: 1 = "new(*ptf-> gcp,::flx::rtl::schannel_ptr_map,false) ::flx::rtl::schannel_t()" requires property "needs_gc" ; // THEORETICALLY UNSOUND gen mk_null_channel : 1 -> _schannel = "(::flx::rtl::schannel*)NULL"; fun isNULL: _schannel -> bool = "!$1"; // READ PRIMITIVE inline proc read_address (chan:_schannel, loc: &address) { svc$ svc_sread$ chan, loc; //println$ "read_address: " + loc->str; } // WRITE PRIMITIVE inline proc write_address (chan:_schannel, var v:address) { //println$ "write_address: " + v.str; svc$ svc_swrite$ chan, &v; } // TYPED CHANNELS The type of a bidirectional synchronous channel. _gc_pointer type schannel[t] = "::flx::rtl::schannel_t*"; The type of an input synchronous channel. _gc_pointer type ischannel[t] = "::flx::rtl::schannel_t*"; The type of an output synchronous channel. _gc_pointer type oschannel[t] = "::flx::rtl::schannel_t*"; Create a bidirectional synchronous channel. gen mk_schannel[t]():schannel[t] => C_hack::cast[schannel[t]] #_schannel ; ctor[T] address: oschannel[T] = "$1"; ctor[T] address: ischannel[T] = "$1"; Safe cast from bidirectional to ouput synchronous channel. ctor[t] oschannel[t](x:schannel[t]) => C_hack::cast[oschannel[t]] x; Safe cast from bidirectional to input synchronous channel. ctor[t] ischannel[t](x:schannel[t]) => C_hack::cast[ischannel[t]] x; Make an input and an output channel out of a bidirectional channel. gen mk_ioschannel_pair[t](var ch:schannel[t]) => ischannel[t] ch, oschannel[t] ch ; Construct a connected input and output channel pair. gen mk_ioschannel_pair[t]() => mk_ioschannel_pair[t]$ mk_schannel[t] () ; // READ POINTER inline proc read_pointer[T] (chan:ischannel[&T], p: &&T) { read_address$ C_hack::cast[_schannel] chan, C_hack::cast[&address] p; } // WRITE POINTER inline proc write_pointer[T] (chan:oschannel[&T], var p:&T) { write_address$ C_hack::cast[_schannel] chan, p.address; } // pass in address of location to put the pointer to the T data proc read[T] (chan:schannel[T], loc: &&T) { svc$ svc_sread$ C_hack::cast[_schannel] chan, C_hack::reinterpret[&root::address] (loc); } // pass in address of location to put the T data proc read[T] (chan:schannel[T], p: &T) { var loc: &T; read (chan, &loc); p <- *loc; } Read an item from a bidirectional channel. inline gen read[T] (chan:schannel[T]) = { var loc: &T; read (chan, &loc); return *loc; } proc read[T] (chan:ischannel[T], loc: &&T) { read (C_hack::cast[schannel[T]] chan, loc); } proc read[T] (chan:ischannel[T], p: &T) { read (C_hack::cast[schannel[T]] chan, p); } Read an item from an input channel. inline gen read[T] (chan:ischannel[T]) => read$ C_hack::cast[schannel[T]] chan; Test if channel is read for a read. inline gen ready[T] :ischannel[T] -> bool = "$1->top!=nullptr && !(uintptr_t)$1->top &1u)"; inline gen ready[T] : schannel[T] -> bool = "$1->top!=nullptr && (uintptr_t)$1->top &1u)"; Return Some value if ready, otherwise None inline gen maybe_read[T] (chan:ischannel[T]) => if chan.ready then Some chan.read else None[T] ; inline gen maybe_read[T] (chan:schannel[T]) => if chan.ready then Some chan.read else None[T] ; Write an item to a bidirectional channel. proc write[T] (chan:schannel[T], v:T) { var ps = C_hack::cast[root::address]$ new v; svc$ svc_swrite$ C_hack::cast[_schannel] chan, &ps; } proc write[T] (chan:oschannel[T], v:T) { write (C_hack::cast[schannel[T]] chan, v); } Multi Write an item to a bidirectional channel. proc broadcast[T] (chan:schannel[T], v:T) { var ps = C_hack::cast[root::address]$ new v; svc$ svc_multi_swrite$ C_hack::cast[_schannel] chan, &ps; } Multi Write an item to an output channel. proc broadcast[T] (chan:oschannel[T], v:T) { broadcast (C_hack::cast[schannel[T]] chan, v); } // Very high power though not very efficient conversion // from ischannel to iterator. // Given i: ischannel[T] you can just write // for j in i do .. done gen iterator[T] (i:ischannel[T]) () : opt[T] = { next:> var y = None[T]; frun { var x = read i; y = Some x; }; match y do | Some _ => yield y; goto next; | None => return y; done } // Here is a subroutine call, assuming the // fibre is already created inline gen subcall[r,w] (chout:%>w, chin:%<r) (arg:w):r = { write (chout,arg); return read chin; } // Now, we can use the channels AS a function: inline fun apply[r,w] (ch:(%>w * %<r), arg:w):r => subcall ch arg ; }
3 Synchronous multiplexor
The following device acts like a select, that is, the reader get all the input data, but the order is indeterminate.
[Not clear how this is useful .. ]
share/lib/std/control/mux.flx
Schannel multiplexor. Read multiple input schannels, write to an output schannel. open class Multiplexor { Schannel copy. noinline proc copy[T] (i:ischannel[T],o:oschannel[T]) () { while true do var x = read i; write (o,x); done } Schannel multiplexor based on iterator argument. Accepts stream of input schannels. Writes to output schannel. proc mux[T] (inp:1->opt[ischannel[T]], out:oschannel[T]) () { for i in inp do spawn_fthread$ copy(i,out); done } Schannel multiplexor based on streamable data structure. Creates stream of input schannels. Writes to output schannel. fun mux[C,T with Streamable[C,ischannel[T]]] (a:C, out:oschannel[T]) => mux (iterator a, out) ; }
4 Schannel and Pipe syntax
Special syntax for both pipes and also abbreviation for schannel types.
share/lib/std/control/schannels.flx
open class DuplexSchannels { _gc_pointer type duplex_schannel[r,w] = "::flx::rtl::schannel_t*"; inline gen read[r,w] (chan:duplex_schannel[r,w]) : r => read (C_hack::cast[ischannel[r]] chan) ; inline proc write[r,w] (chan:duplex_schannel[r,w], v:w) => write (C_hack::cast[oschannel[w]] chan, v) ; ctor[r,w] duplex_schannel[r,w] () => C_hack::cast[duplex_schannel[r,w]] #_schannel ; // NOTE: assuming the mainline want to read an r // after passing a w to the subroutine, it must // use the second channel of the pair to do so. // passing the first one to the subroutine. gen mk_duplex_schannel_pair[r,w] () => let c = #_schannel in C_hack::cast[duplex_schannel[w,r]] c, C_hack::cast[duplex_schannel[r,w]] c ; // Here is our subroutine call, assuming the // fibre is already created inline gen subcall[r,w] (ch:duplex_schannel[r,w]) (arg:w):r = { write (ch,arg); return read ch; } // Now, we can use the duplex channel AS a function: inline fun apply[r,w] (ch:duplex_schannel[r,w], arg:w):r => subcall ch arg ; // Here is a self contained subcall that spawns the fibre // and creates the channel too. This model is for a one shot. inline gen subcall[r,w] (fib: duplex_schannel[w,r] -> 1 -> 0) (arg: w) : r = { var wr,rw = mk_duplex_schannel_pair[r,w](); spawn_fthread$ fib wr; write (rw,arg); return read rw; } inline gen apply[r,w] ( fib: duplex_schannel[w,r] -> 1 -> 0, arg: w) : r => subcall fib arg ; } // class DuplexSchannels
Let's now rewrite our example:
test/regress/rt/subrout-02.flx
proc int_to_string (ch: %<int%>string) () { var x = read ch; var r = x.str; write(ch, r); } var wr, rw = mk_duplex_schannel_pair[string,int](); spawn_fthread$ int_to_string wr; println$ rw 42;
test/regress/rt/subrout-02.expect
42
Even more compactly:
test/regress/rt/subrout-03.flx
proc int_to_string (ch: %<int%>string) () { var x = read ch; var r = x.str; write(ch, r); } println$ int_to_string 42;
test/regress/rt/subrout-03.expect
42