ExpandCollapse

+ 1 Fibres (fthreads)

share/lib/std/control/fibres.flx

  
  Low level management of Felix fthreads (fibres).
  open class Fibres
  {
    private gen _start[t]: (t->0)*t->cont = "$1->clone()->call(0,$2)";
  
    Function to start a continution with argument type t.
    gen start[t] (p:t->0) (x:t) = { return _start (p,x); }
  
    private fun _start0: (1->0)->cont = "$1->clone()->call(0)";
  
    Function to start a contiuation without an argument.
    gen start (p:1->0) = { return _start0 (p); }
    gen _continuation_start (p:1->0) = { return _start0 (p); } // compiler looks this up do not change
  
    Function to make a fibre out of a continuation.
    gen mk_thread: cont->fthread = "new(*ptf-> gcp,::flx::rtl::_fthread_ptr_map,false) ::flx::rtl::fthread_t($1,nullptr)";
  
    // Spawn a fibre on this fibres scheduler.
    // uses a supervisor call so can't be used in a function
    proc spawn_fthread(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_spawn_fthread fthr;
    }
  
    proc _svc_fthread (fthr:fthread) {
        svc$ svc_spawn_fthread fthr;
    }
  
    proc schedule_fthread(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_schedule_fthread fthr;
    }
  
    proc suicide: 1 = "throw (con_t*)NULL;";
  
    proc chain : cont = "return $1;" requires property "heap_closure";
  
    // *********************************************************
    // NESTED SYNC SCHEDULER
    // NOTE: deprecated in favour of async scheduler below
    // *********************************************************
    The type of a fibre scheduler.
    type fibre_scheduler = "::flx::run::sync_sched*" requires header '#include "flx_sync.hpp"';
  
    Construct a fibre scheduler.
     NOTE: NOW GARBAGE COLLECTED!
    ctor fibre_scheduler: bool = """
      new(*ptf-> gcp,::flx::run::sync_sched_ptr_map,false) 
        ::flx::run::sync_sched
        (
          $1, 
          ptf-> gcp, 
          new(*ptf-> gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(ptf-> gcp)
        )
      """
    ;
    ctor fibre_scheduler () =>
      fibre_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
    ;
       
  
    Spawn a fibre on a given scheduler with a given continuation.
    Note: does NOT run it!
    FIXME: no mutex guard!!
    proc spawn_fibre: fibre_scheduler * fthread = """
      $1->active->push_back($2);
    """;
  
    proc frun: (1->0) = "::flx::rtl::executil::frun (ptf-> gcp, $1);" 
      requires header '#include "flx_executil.hpp"'
    ;
  
    proc run: fibre_scheduler = "$1->frun();";
  
    proc run (p: 1 -> 0) {
      var s = fibre_scheduler();
      spawn_fthread s p;
      s.run;
    }
  
   
    The type of the stop state of the fibre scheduler.
    terminated: the scheduler is terminated.
    blocked: the scheduler is out of threads to run.
    delegated: the scheduler has been issued a service
     request by a thread which it cannot satisfy.
     The scheduler is put in delegated state and awaits
     for another service to satisfy the request and put
     it back in operation.
      //$ Note: there is no "operating" state because the
    stop state can only be queried by the schedulers caller
    when the scheduler returns control to it.
    enum fibre_scheduler_state {
      terminated, 
      blocked,   
      delegated  
    };
    fun get_state : fibre_scheduler -> fibre_scheduler_state = "$1->fs";
  
  
    Core user procedure for launching a fibre.
    proc spawn_fthread (fs:fibre_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }
  
    // *********************************************************
    // ASYNC SCHEDULER
    // *********************************************************
    // FIXME: it is leaked .. to be fixed shortly
  
    // async scheduler type
    type async_scheduler = "::flx::run::async_sched*" 
      requires header '#include "flx_async.hpp"',
      package "flx_arun"
    ;
  
    // async scheduler constructor
    ctor async_scheduler: bool = """
      new 
      ::flx::run::async_sched
          (
            ptf-> world, // world object
            $1, // debug driver flag 
            ptf-> gcp,  // GC profile object
            new(*ptf-> gcp, ::flx::run::fthread_list_ptr_map, false) ::flx::run::fthread_list(ptf-> gcp),
            ::flx::run::async_sched::mainline // temporary hack! thread kind (should be inherited)
          )
        """
      ;
  
    // async scheduler constructor wrapper
    ctor async_scheduler () =>
      async_scheduler (Env::getenv "FLX_DEBUG_DRIVER" != "")
    ;
  
    // spawn fibre on async scheduler from fthread object
    proc spawn_fibre: async_scheduler * fthread = """
        $1->ss->active->push_back($2);
    """;
  
    // spawn fibre on async scheduler from procedure
    proc spawn_fthread (fs:async_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }
  
    proc prun: async_scheduler = "$1->prun();";
  
  
    proc async_run (p: 1 -> 0) {
      var s = async_scheduler();
      spawn_fthread s p;
      s.prun;
    }
  
  // *********************************************************
  // MISC STUFF THAT MAY NOT BE USED, CONSIDER DELETING IT
  // UNRELIABLE ANYHOW .. CHECK PLUGINS ... 
  // *********************************************************
  
  
    Execute a single step of a fibre.
    gen step: cont -> cont = "$1->resume()";
  
    Schedule death of a fibre.
    proc kill: fthread = "$1->cc = 0;";
  
    Run a continuation until it terminates.
    Do not use this proc if the underlying
    procedure attempts to read messages.
    This is a low level primitive, bypassing fthreads.
    proc run: cont = "::flx::rtl::executil::run($1);" requires package "flx_executil";
  
    private proc _send[t]: &cont * t =
    """
    {
      using namespace ::flx::rtl;
      con_t *tmp = *(con_t**)$1.get_data();
      // run target until it reaches a service request (or death)
      while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
        try { tmp=tmp->resume(); }
        catch (con_t *x) { tmp = x; }
      }
      // check it is alive and making the expected service request
      if (!tmp)
        throw flx_exec_failure_t (__FILE__,"send","Send to terminated procedure");
      if (!tmp->p_svc)
        throw flx_exec_failure_t (__FILE__,"send","Send to unready Procedure");
      if (tmp->p_svc->variant != svc_read)
        throw flx_exec_failure_t (__FILE__,"send","Send to Procedure which is not trying to read");
      // store the message
      **(?1**)tmp->p_svc->data= $2;
      // clear the service request
      tmp->p_svc = 0;
      // run the target until the next service request (or death)
      while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
        try { tmp=tmp->resume(); }
        catch (con_t *x) { tmp = x; }
      }
      // save the new continuation
      *(con_t**)$1.get_data() = tmp;
  
    }
    """;
  
    Send a message to a continuation.
    There is no type checking on the message type.
    The procedure is executed until
    the next wait_state, then the message is stored.
    Low level primitive, bypassing fthreads.
    proc send[t] (p:&cont) (x:t)
    {
      _send (p,x);
    }
  
  }
  

+ 2 Synchronous Channels

share/lib/std/control/schannels.flx

  
  Sychronous Channels.
  Used to exchange control and possibly data
  between Felix f-threads (aka fibres).
  
  open class Schannel 
  {
    // SCHANNEL PRIMITIVE
    _gc_pointer type _schannel = "::flx::rtl::schannel_t*";
  
    // PRIMITIVE CONSTRUCTOR
    ctor _schannel: 1 = 
      "new(*ptf-> gcp,::flx::rtl::schannel_ptr_map,false) ::flx::rtl::schannel_t()"
      requires property "needs_gc"
    ;
  
    // THEORETICALLY UNSOUND 
    gen mk_null_channel : 1 -> _schannel = "(::flx::rtl::schannel*)NULL"; 
    fun isNULL: _schannel -> bool = "!$1"; 
  
    // READ PRIMITIVE
    inline proc read_address (chan:_schannel, loc: &address) {
      svc$ svc_sread$ chan, loc;
      //println$ "read_address: " + loc->str;
    } 
  
    // WRITE PRIMITIVE
    inline proc write_address (chan:_schannel, var v:address) { 
      //println$ "write_address: " + v.str;
      svc$ svc_swrite$ chan, &v;
    }
  
    // TYPED CHANNELS
    The type of a bidirectional synchronous channel.
    _gc_pointer type schannel[t] = "::flx::rtl::schannel_t*";
  
    The type of an input synchronous channel.
    _gc_pointer type ischannel[t] = "::flx::rtl::schannel_t*";
  
    The type of an output synchronous channel.
    _gc_pointer type oschannel[t] = "::flx::rtl::schannel_t*";
  
    Create a bidirectional synchronous channel.
    gen mk_schannel[t]():schannel[t] => 
      C_hack::cast[schannel[t]] #_schannel
    ;
  
    ctor[T] address: oschannel[T] = "$1";
    ctor[T] address: ischannel[T] = "$1";
  
    Safe cast from bidirectional to ouput synchronous channel.
    ctor[t] oschannel[t](x:schannel[t]) => C_hack::cast[oschannel[t]] x;
  
    Safe cast from bidirectional to input synchronous channel.
    ctor[t] ischannel[t](x:schannel[t]) => C_hack::cast[ischannel[t]] x;
  
    Make an input and an output channel out of a bidirectional channel.
    gen mk_ioschannel_pair[t](var ch:schannel[t]) =>
      ischannel[t] ch, oschannel[t] ch
    ;
  
    Construct a connected input and output channel pair.
    gen mk_ioschannel_pair[t]() =>
      mk_ioschannel_pair[t]$ mk_schannel[t] ()
    ;
  
  
    // READ POINTER
    inline proc read_pointer[T] (chan:ischannel[&T], p: &&T) {
      read_address$ C_hack::cast[_schannel] chan, C_hack::cast[&address] p;
    } 
  
    // WRITE POINTER
    inline proc write_pointer[T] (chan:oschannel[&T], var p:&T) { 
      write_address$ C_hack::cast[_schannel] chan, p.address;
    }
  
  
  
    // pass in address of location to put the pointer to the T data
    proc read[T] (chan:schannel[T], loc: &&T) {
      svc$ svc_sread$ C_hack::cast[_schannel] chan, C_hack::reinterpret[&root::address] (loc);
    }
  
    // pass in address of location to put the T data
    proc read[T] (chan:schannel[T], p: &T) {
      var loc: &T;
      read (chan, &loc);
      p <- *loc;
    }
  
    Read an item from a bidirectional channel.
    inline gen read[T] (chan:schannel[T]) = {
      var loc: &T;
      read (chan, &loc);
      return *loc;
    }
    proc read[T] (chan:ischannel[T], loc: &&T) { read (C_hack::cast[schannel[T]] chan, loc); }
    proc read[T] (chan:ischannel[T], p: &T) { read (C_hack::cast[schannel[T]] chan, p); }
  
  
    Read an item from an input channel.
    inline gen read[T] (chan:ischannel[T]) => read$ C_hack::cast[schannel[T]] chan;
  
    Test if channel is read for a read.
    inline gen ready[T] :ischannel[T] -> bool = "$1->top!=nullptr && !(uintptr_t)$1->top &1u)";
    inline gen ready[T] : schannel[T] -> bool = "$1->top!=nullptr && (uintptr_t)$1->top &1u)";
  
    Return Some value if ready, otherwise None
    inline gen maybe_read[T] (chan:ischannel[T]) =>
      if chan.ready then Some chan.read else None[T]
    ;
  
    inline gen maybe_read[T] (chan:schannel[T]) =>
      if chan.ready then Some chan.read else None[T]
    ;
  
    Write an item to a bidirectional channel.
    proc write[T] (chan:schannel[T], v:T) {
      var ps = C_hack::cast[root::address]$ new v;
      svc$ svc_swrite$ C_hack::cast[_schannel] chan, &ps;
    }
  
    proc write[T] (chan:oschannel[T], v:T) { 
      write (C_hack::cast[schannel[T]] chan, v); 
    }
   
    Multi Write an item to a bidirectional channel.
    proc broadcast[T] (chan:schannel[T], v:T) {
      var ps = C_hack::cast[root::address]$ new v;
      svc$ svc_multi_swrite$ C_hack::cast[_schannel] chan, &ps;
    }
   
    Multi Write an item to an output channel.
    proc broadcast[T] (chan:oschannel[T], v:T) { 
      broadcast (C_hack::cast[schannel[T]] chan, v); 
    }
  
    // Very high power though not very efficient conversion
    // from ischannel to iterator.
    // Given i: ischannel[T] you can just write
    // for j in i do .. done
    gen iterator[T] (i:ischannel[T]) () : opt[T] = {
    next:>
      var y = None[T];
      frun { var x = read i; y = Some x; };
      match y do
      | Some _ => yield y; goto next;
      | None => return y;
      done
    }
  
    // Here is a subroutine call, assuming the
    // fibre is already created
    inline gen subcall[r,w] (chout:%>w, chin:%<r) (arg:w):r =
    {
      write (chout,arg);
      return read chin;
    }
  
    // Now, we can use the channels AS a function:
    inline fun apply[r,w] (ch:(%>w * %<r), arg:w):r =>
      subcall ch arg
    ;
  
  }
  

+ 3 Synchronous multiplexor

The following device acts like a select, that is, the reader get all the input data, but the order is indeterminate.

[Not clear how this is useful .. ]

share/lib/std/control/mux.flx

  
  Schannel multiplexor.
  Read multiple input schannels, write to an output schannel.
  open class Multiplexor
  {
    Schannel copy.
    noinline proc copy[T] (i:ischannel[T],o:oschannel[T]) () 
    {
      while true do 
        var x = read i;
        write (o,x);
      done
    }
  
    Schannel multiplexor based on iterator argument.
    Accepts stream of input schannels.
    Writes to output schannel.
    proc mux[T] (inp:1->opt[ischannel[T]], out:oschannel[T]) ()
    {
      for i in inp do 
        spawn_fthread$ copy(i,out); 
      done 
    }
  
  
    Schannel multiplexor based on streamable data structure.
    Creates stream of input schannels.
    Writes to output schannel.
    fun mux[C,T with Streamable[C,ischannel[T]]] (a:C, out:oschannel[T]) =>
      mux (iterator a, out)
    ;
  }

+ 4 Schannel and Pipe syntax

Special syntax for both pipes and also abbreviation for schannel types.

share/lib/std/control/schannels.flx

  
  open class DuplexSchannels
  {
  _gc_pointer type duplex_schannel[r,w] = "::flx::rtl::schannel_t*";
  
  inline gen read[r,w] (chan:duplex_schannel[r,w]) : r =>
    read (C_hack::cast[ischannel[r]] chan)
  ;
  
  inline proc write[r,w] (chan:duplex_schannel[r,w], v:w)  =>
    write (C_hack::cast[oschannel[w]] chan, v)
  ;
  
  ctor[r,w] duplex_schannel[r,w] () =>
    C_hack::cast[duplex_schannel[r,w]] #_schannel
  ; 
  
  // NOTE: assuming the mainline want to read an r
  // after passing a w to the subroutine, it must
  // use the second channel of the pair to do so.
  // passing the first one to the subroutine.
  gen mk_duplex_schannel_pair[r,w] () =>
    let c = #_schannel in
    C_hack::cast[duplex_schannel[w,r]] c,
    C_hack::cast[duplex_schannel[r,w]] c
  ;
  
  // Here is our subroutine call, assuming the
  // fibre is already created
  inline gen subcall[r,w] (ch:duplex_schannel[r,w]) (arg:w):r =
  {
    write (ch,arg);
    return read ch;
  }
  
  // Now, we can use the duplex channel AS a function:
  inline fun apply[r,w] (ch:duplex_schannel[r,w], arg:w):r =>
    subcall ch arg
  ;
  
  // Here is a self contained subcall that spawns the fibre
  // and creates the channel too. This model is for a one shot.
  inline gen subcall[r,w] 
    (fib: duplex_schannel[w,r] -> 1 -> 0)
    (arg: w)
  : r =
  {
    var wr,rw = mk_duplex_schannel_pair[r,w]();
    spawn_fthread$ fib wr;
    write (rw,arg);
    return read rw;
  }
  
  inline gen apply[r,w] (
    fib: duplex_schannel[w,r] -> 1 -> 0,
    arg: w)
  : r =>
    subcall fib arg
  ;
  
  } // class DuplexSchannels
  

Let's now rewrite our example:

test/regress/rt/subrout-02.flx

  proc int_to_string (ch: %<int%>string)  ()
  {
    var x = read ch;
    var r = x.str;
    write(ch, r);
  }
  var wr, rw = mk_duplex_schannel_pair[string,int]();
  spawn_fthread$ int_to_string wr;
  println$ rw 42;

test/regress/rt/subrout-02.expect

42

Even more compactly:

test/regress/rt/subrout-03.flx

  proc int_to_string (ch: %<int%>string)  ()
  {
    var x = read ch;
    var r = x.str;
    write(ch, r);
  }
  println$ int_to_string 42;

test/regress/rt/subrout-03.expect

42