ExpandCollapse

+ 1 Continuations

share/lib/rtl/rt/con.hpp

#ifndef CON
#define CON
// continuation
#include <cstddef>
using size_t = ::std::size_t;

struct fibre_t;
struct con_t {
  fibre_t *fibre;
  int pc;        // program counter
  con_t () : pc(0), fibre(nullptr) {}
  virtual con_t *return_control()=0;
  virtual con_t *resume()=0;
  virtual size_t size()const=0;
  virtual ~con_t(){}
};

// coroutine
struct coroutine_t : con_t {
  con_t *return_control(); // deletes object and returns nullptr
};
 
// top level subroutine
struct subroutine_t : con_t {
  subroutine_t(fibre_t *f) { fibre = f; }
  con_t *caller; // caller continuation
  con_t *return_control(); // deletes object and returns caller
};

// NOTE: a subroutine must not delete itself if it returns
// a nested procedure which binds to its local variable frame

struct curry_subroutine_t : con_t {
  con_t *caller; // caller continuation
  curry_subroutine_t (fibre_t *f) { fibre = f; }

  con_t *return_control() {
    auto tmp = caller;
    caller = nullptr;
    return tmp;
  }
};
#endif

+ 2 Fibres

share/lib/rtl/rt/fibre.hpp

#ifndef FIBRE
#define FIBRE
// fibre3.hpp
// fibre
#include "con.hpp"
struct csp_process_t;
union svc_req_t;

struct fibre_t {
  con_t *cc;
  fibre_t *next;
  csp_process_t *process;
  svc_req_t *svc_req; // request

  // default DEAD
  //fibre_t() : cc(nullptr), next(nullptr), process(nullptr) {}

  // construct from continuation
  fibre_t(con_t *ccin, csp_process_t *owned_by) : 
    cc(ccin), next (nullptr), process(owned_by), svc_req(nullptr)
  {
    ccin->fibre=this; 
  }

  // immobile
  fibre_t(fibre_t const&)=delete;
  fibre_t& operator=(fibre_t const&)=delete;

  // destructor deletes any remaining continuations in spaghetti stack
  ~fibre_t(); // defined in csp.hpp to resolve circular reference
  
  // run until either fibre issues a service request or dies
  svc_req_t *run_fibre() { 
    while(cc) {
      cc=cc->resume(); 
      if(cc && svc_req) return svc_req;
    }
    return nullptr;
  }
};
#endif

+ 3 Channel Design

A channel is a lightweight synchronisation device which allows fibres to perform two operations: read and write. When a read is performed by a channel owning fibre, if there is a writer on the channel, data is transfered, the writer is removed from the channel, and both the reader and writer become active.

If there is no writer on the channel, the reader is suspended and added to the channel: the channel now owns the fibre.

Write operation is dual.

If no one owns a channel, the channel is deleted along with all the fibres it owns. Similarly, if no one owns a fibre, it is deleted, and shared ownership of the any channels the fibre has will be relinquished. Together this can lead to a recursive deletion of many channels and fibres.

For example in a pipeline, if one fibre in the middle decides to quit, it is deleted, and now the components on either side of it can no longer write or read, so they will be deleted as well, and the effect cascades and destroys the whole pipeline.

To make this work, we have to use a double reference counting scheme. Objects called channel endpoints reference count the channel they're endpoints for. In turn, each endpoint is accessed by a smart pointer which reference counts it, and can be passed around the continuations of the fibre. The fibre itself must maintain an array of all the endpoints it owns, the registry.

When a fibre suspends on an I/O operation this is what happens: first, the fibre is removed from the scheduler, and added to the channel. We use a linked list in the channel. The reference count of the fibre doesn't change at this point, since we simply moved ownership to the channel.

Next, we have to decrement the reference count of all the registered endpoints, since none of them can do any work, since the owning fibre is no longer active. When the fibre is resumed all these reference counts must be incremented again.

When the reference count of a channel endpoint would be decremented to zero, the endpoint is deleted, which causes the reference count of the channel it refers to to be decremented.

And clearly, when the channel reference count goes to zero, it is deleted too. When the channel is deleted, all the fibres owned by the channel are also deleted, since none of them can become active again, since they're waiting on I/O on the channel, which no longer exists.

Deleting a fibre requires deleting all the continuations in its stack. As this happens, references to channel endpoints are deleted, decrementing the endpoint reference counts, until only the registry of the fibre remains. When the references in the registry are deleted all the endpoints will be deleted, which will decrement, and possibly delete, the channel referred to.

The implementation is not quite trivial, since it is recursive and cycles must be broken to prevent an infinite recursion. This is done by marking the channel when deletion is in progress, which stops attempts to delete the owned fibres (since they're already being deleted). In fact, we use the channel refcount itself as the mark: if the refcount is zero, deletion of the channel is in progress.

Note that channels are shared between fibres, but an endpoint msut be exclusively owned by a single fibre. The smart pointer to the channel endpoints can be copied, assigned, and moved around but only within continuations of the fibre.

When a fibre is spawned it must be passed references to channel endpoints, which is done by constructing its registry and passing that. It is vital the spawner delete all these endpoints it is not allowed to own. This should be done as soon as possible because whilst it owns these endpoints, the referred to channel is still considered reachable. This must be done before the next I/O operation because these endpoints are not registered in the creators registry. This means if it is suspended, the reference counts of the end points will not be decremented as required.

One should note this is not trivial. When a fibre wants to spawn several others and connect them with channels, it must hold the appropriate endpoint array for each fibre it will spawn until they're done. On the other hand since the first spawned fibre might start immediately and immediately do I/O, the endpoints the other ends of the channels must be held long enough to ensure that fibre does not immediately suicide.

Another point to note is that a channel must have a reference count of at least 2 to be useful, one for each of two endpoints in different active fibres.

Finally note, any number of fibres can subscribe to a channel. Channels are untyped can can operate in any direction. Similarly, endpoints are not intrinsically input or output. A channel is, of course, doomed if there is only one subscriber.

share/lib/rtl/rt/channel.hpp

#ifndef CHANNEL
#define CHANNEL
#include <memory>
#include <cstdint>
#include <iostream>
using uintptr_t = ::std::uintptr_t;
#include "fibre.hpp"
#include "allocator.hpp"

struct channel_endpoint_t;

// CHANNEL ABSTRACTION

// low bit fiddling routines
inline static bool get_lowbit(void *p) { 
  return (uintptr_t)p & (uintptr_t)1u; 
}
inline static void *clear_lowbit(void *p) { 
  return (void*)((uintptr_t)p & ~(uintptr_t)1u); 
}
inline static void *set_lowbit(void *p) { 
  return (void*)((uintptr_t)p | (uintptr_t)1u); 
}

// channel
struct channel_t {
  ::std::atomic_size_t refcnt;
  fibre_t *top;
  
  channel_t () : top (nullptr), refcnt(0) {}
  virtual ~channel_t(){ 
    //::std::cerr << "channel " << this << " destructor" << ::std::endl; 
  }

  // immobile object
  channel_t(channel_t const&)=delete;
  channel_t& operator= (channel_t const&)=delete;

  // required for the polymorphic deleter
  virtual size_t size() const = 0;

  virtual void signal()=0;

  // channel read operation
  virtual void read(void **target, fibre_t **pcurrent)=0;

  // channel write operation
  virtual void write(void **source, fibre_t **pcurrent)=0;

protected:
  // basic push and pop operations, not thread safe

  void st_push_reader(fibre_t *r) { 
    r->next = top; 
    top = r; 
  }

  void st_push_writer(fibre_t *w) { 
    w->next = top; 
    top = (fibre_t*)set_lowbit(w);
  }

  fibre_t *st_pop_reader() { 
    fibre_t *tmp = top; 
    if(!tmp || get_lowbit(tmp))return nullptr;
    top = top -> next;
    return tmp; // lowbit is clear, its a reader 
  }

  fibre_t *st_pop_writer() { 
    fibre_t *tmp = top; 
    if(!tmp || !get_lowbit(tmp)) return nullptr;
    tmp = (fibre_t*)clear_lowbit(tmp); // lowbit is set for writer
    top = tmp -> next;
    return tmp;
  }

};

struct chan_epref_t {
  channel_endpoint_t *endpoint;
  chan_epref_t(channel_endpoint_t *p) : endpoint(p) {} // endpoint ctor sets refcnt to 1 initially

  channel_endpoint_t *operator->()const { return endpoint; }
  channel_endpoint_t *get() const { return endpoint; }

  chan_epref_t() : endpoint(nullptr) {}

  // rvalue move
  chan_epref_t(chan_epref_t &&p) {
    if(p.endpoint) { 
      endpoint = p.endpoint; 
      p.endpoint = nullptr; 
    }
    else endpoint = nullptr;
  } 

  // lvalue copy
  chan_epref_t(chan_epref_t &p);

  // rvalue assign
  void operator= (chan_epref_t &&p) {
    if (&p!=this) { // ignore self assign
      this->~chan_epref_t(); // destroy target
      new(this) chan_epref_t(::std::move(p)); // move source to target
    }
  } // rass

  // lvalue assign
  void operator= (chan_epref_t &p) { 
    if(&p!=this) { // ignore self assign
      this->~chan_epref_t(); // destroy target
      new(this) chan_epref_t(p); // copy source to target
    }
  } // lass
 
 
  // destructor
  // uses allocator passed to endpoint on construction to delete it
  ~chan_epref_t();
};


// the allocator used to construct the endpoint
// must be passed to it so it can be used to delete it
// it MUST be the same allocator used to construct the channel
// try to FIXME so it is .. this requires storing the allocator in
// the channel OR ensuring all channels and endpoints are constructed
// using the system allocator
struct channel_endpoint_t {
  size_t refcnt;
  channel_t *channel;
  alloc_ref_t allocator;
  channel_endpoint_t(channel_t *p, alloc_ref_t a) : allocator(a), channel(p), refcnt(1) { ++channel->refcnt; }

  // immobile object
  channel_endpoint_t(channel_endpoint_t const&)=delete;
  channel_endpoint_t& operator= (channel_endpoint_t const&)=delete;

  // create a duplicate of the current endpoint refering
  // to the same channel. Returns a chan_epref_t; a shared
  // pointer to the new endpoint. Incredrummoyne@corleonemarinas.comments the counter
  // of endpoints in the channel.
  // note, C++ must construct a single object containing
  // both the reference count and the channel endpoint.

  chan_epref_t dup() { 
    return chan_epref_t(new(allocator) channel_endpoint_t(channel, allocator));
  }

  ~channel_endpoint_t () {
 //::std::cerr << "Channel endpoint " << this << " destructor, channel " <<  channel << ", refcnt " << channel->refcnt.load() << ::std::endl; 
    switch (channel->refcnt.load()) {
      case 0: break;
      case 1: delete_channel(); break;
      default: --channel->refcnt; break;
    }
  }

  void delete_channel() {
 //::std::cerr << "Delete channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl;
    fibre_t *top = channel->top;
    channel->top = nullptr;
    channel->refcnt.store(0);
    while (top) {
      fibre_t *f = (fibre_t*)clear_lowbit(top);
      fibre_t *tmp = f->next;
// ::std::cerr << "Channel " << channel << " Deletes fibre " << f << ", next=" << tmp << ::std::endl;
      delete_concrete_object(f, allocator);
      top = tmp;
    }
 //::std::cerr << "Deleting channel " << channel << " now" << ::std::endl;
    delete_csp_polymorphic_object(channel, allocator);
  }
  
};


// lvalue copy
chan_epref_t::chan_epref_t(chan_epref_t &p) { 
  if(p.endpoint) { 
    endpoint = p.endpoint; 
    p.endpoint->refcnt++; 
  }
  else endpoint = nullptr; 
} 

// destructor
// uses allocator passed to endpoint on construction to delete it
chan_epref_t::~chan_epref_t() { 
  if (endpoint) {
    if(endpoint->refcnt == 1) { 
//::std::cerr << "Endpoint ref " << this << " deletes endpoint " << endpoint << ::std::endl;
      delete_concrete_object(endpoint, endpoint->allocator); 
    }
    else 
      --endpoint->refcnt; 
  }
} // dtor

chan_epref_t acquire_channel(alloc_ref_t a, channel_t *p) {
  return chan_epref_t(new(a) channel_endpoint_t(p,a));
}
#endif

+ 4 Service Requests

share/lib/rtl/rt/svc.hpp

#ifndef SVC_HPP
#define SVC_HPP
#include "allocator.hpp"

struct chan_epref_t;
struct con_t;

// service requests
//
// Request codes
enum svc_code_t {
  read_request_code_e,
  write_request_code_e,
  spawn_fibre_request_code_e,
  spawn_fibre_deferred_request_code_e,
  spawn_process_request_code_e,
  spawn_cothread_request_code_e
};

// synchronous I/O requests
struct io_request_t {
  svc_code_t svc_code;
  chan_epref_t *chan;
  void **pdata;
};

// fibre and cothread spawn requests
struct spawn_fibre_request_t {
  svc_code_t svc_code;
  con_t *tospawn;
};

// fibre and cothread spawn requests
struct spawn_process_request_t {
  svc_code_t svc_code;
  con_t *tospawn;
  alloc_ref_t process_allocator;
};


// unified service request type (only used for casts)
union svc_req_t {
  io_request_t io_request;
  spawn_fibre_request_t spawn_fibre_request;
  spawn_process_request_t spawn_process_request;
  svc_code_t get_code () const { return io_request.svc_code; }
};
#endif

+ 5 Sequential Channel

share/lib/rtl/rt/sequential_channel.hpp

#ifndef SEQUENTIAL_CHANNEL
#define SEQUENTIAL_CHANNEL

#include "channel.hpp"
#include "svc.hpp"
#include "csp_process.hpp"

// SINGLE THREADED CHANNEL (no locking)

// channel
struct sequential_channel_t : channel_t {
  
  sequential_channel_t () : channel_t() {}

  size_t size() const override { return sizeof(sequential_channel_t); }
  void signal() override  {} 

  void read(void **target, fibre_t **pcurrent) override  {
//::std::cerr << *pcurrent << " Sequential Channel read begins " << ::std::endl;
    fibre_t *current = *pcurrent;
    fibre_t *w = st_pop_writer();
//::std::cerr << *pcurrent << " Sequential Channel read finds writer " << w << ::std::endl;
    if(w) {
      ++refcnt;
      *target =
        *w->svc_req->io_request.pdata; // transfer data
      w->process->push(w); // onto active list
    }
    else {
      if(refcnt == 1) {
        *pcurrent = current->process->pop(); // active list
//::std::cerr << "channel read " << this << " deletes fibre " << current << ::std::endl;
        delete_concrete_object(current,current->process->process_allocator);
      } else {
        --refcnt;
        st_push_reader(current);
        *pcurrent = current->process->pop(); // active list
      }
    }
  }

  void write(void **source, fibre_t **pcurrent) override  {
//::std::cerr << *pcurrent << " Sequential Channel write begins " << ::std::endl;
    fibre_t *current = *pcurrent;
    fibre_t *r = st_pop_reader();
//::std::cerr << *pcurrent << " Sequential Channel write finds reader " << r << ::std::endl;
    if(r) {
      ++refcnt;
      *r->svc_req->io_request.pdata = *source;

      if(r->process == current->process) {
        current->process->push(current); // current is writer, pushed onto active list
        *pcurrent = r; // make reader current
      }
      else {
        r->process->push(r);
      }
    }
    else {
      if(refcnt == 1) {
        *pcurrent = current->process->pop(); // reset current from active list
//::std::cerr << "channel write " << this << " deletes fibre " << current << ::std::endl;
        delete_concrete_object(current,current->process->process_allocator);
      } else {
        --refcnt;
        st_push_writer(current); // i/o fail: push current onto channel
        *pcurrent = current->process->pop(); // reset current from active list
      }
    }
  }
};

chan_epref_t make_sequential_channel(alloc_ref_t a) {
  return acquire_channel(a, new(a) sequential_channel_t);
}


void system_t::connect_sequential (chan_epref_t *left, chan_epref_t *right) {
  auto chleft= make_sequential_channel(system_allocator);
  auto chright= chleft ->dup(); 
  *left = chleft;
  *right= chright;
}
#endif

+ 6 Concurrent Channel

share/lib/rtl/rt/concurrent_channel.hpp

#ifndef CONCURRENT_CHANNEL
#define CONCURRENT_CHANNEL
#include "allocator.hpp"
#include "system.hpp"
#include "sequential_channel.hpp"

// CONCURRENT CHANNEL (operations are locked, but no async)

struct concurrent_channel_t : sequential_channel_t {
  ::std::atomic_flag lk;
  void lock() { while(lk.test_and_set(::std::memory_order_acquire)); }
  void unlock() { lk.clear(::std::memory_order_release); }

  concurrent_channel_t () : lk(false) {}

  size_t size() const override { return sizeof(concurrent_channel_t); }

  void read(void **target, fibre_t **pcurrent) override {
//::std::cout << "read on " << this << ::std::endl;
    fibre_t *current = *pcurrent;
    lock();
    fibre_t *w = st_pop_writer();
    if(w) {
      ++refcnt;
      unlock();
      *target =
        *w->svc_req->io_request.pdata; // transfer data
      w->process->push(w); // onto active list
    }
    else {
      if(refcnt == 1) {
        //::std::cerr << "Concurrent channel read deletes requesting fibre " << current << :: std::endl;
        *pcurrent = current->process->pop(); // active list
        delete_concrete_object(current,current->process->process_allocator);
      } else {
        --refcnt;
   //::std::cout<< "do_read: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl;
        st_push_reader(current);
        unlock();
        *pcurrent = current->process->pop(); // active list
      }
    }
  }

  void write(void **source, fibre_t **pcurrent) override {
//::std::cout << "write on " << this << ::std::endl;
    fibre_t *current = *pcurrent;
    lock();
    fibre_t *r = st_pop_reader();
    if(r) {
      ++refcnt;
      unlock();
      *r->svc_req->io_request.pdata = *source;

      if(r->process == current->process) {
        current->process->push(current); // current is writer, pushed onto active list
        *pcurrent = r; // make reader current
      }
      else {
        r->process->push(r);
      }
    }
    else {
      if(refcnt == 1) {
        //::std::cerr << "Concurrent channel write deletes requesting fibre " << current << :: std::endl;
        *pcurrent = current->process->pop(); // reset current from active list
        delete_concrete_object(current,current->process->process_allocator);
      } else {
        --refcnt;
   //::std::cout<< "do_write: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl;
        st_push_writer(current); // i/o fail: push current onto channel
        unlock();
        *pcurrent = current->process->pop(); // reset current from active list
      }
    }
  }


};

chan_epref_t make_concurrent_channel(alloc_ref_t a) {
  return acquire_channel(a, new(a) concurrent_channel_t);
}

void system_t::connect_concurrent (chan_epref_t *left, chan_epref_t *right) {
  auto chleft= make_concurrent_channel(system_allocator);
  auto chright= chleft->dup(); 
  *left = chleft;
  *right= chright;
}
#endif

+ 7 Asynchronous channel

share/lib/rtl/rt/async_channel.hpp

#ifndef ASYNC_CHANNEL
#define ASYNC_CHANNEL
#include "allocator.hpp"
#include "concurrent_channel.hpp"

// ASYNCHRONOUS CHANNEL
//
// This is like a concurrent channel, except 
// (a) it actively notifies possibly sleeping subscribers 
// to the condition variable // that the channel has changed state.
// (b) It increments the async count of an active set when a fibre
// of that set is pushed onto the channel
// (c) decrements the async count when a fibre previously on the channel
// is made currect or put onto an active set
//
// NOTE: when a fibre tries to do I/O on a channel and is the
// only holder of an endpoint, the reference count will be 1.
// In this case, it must be deleted because the I/O request can never be
// satisfied. In turn this would decrement the reference count to 0,
// so the channel, and all fibres on it, also need to be deleted.
// Fibres on the channel may hold endpoints to the channel,
// so if the reference count goes to zero no action is taken,
// the channel is going to be deleted anyhow.
//
// There is no point signaling subscribers to the condition variable,
// because the purpose of that is to wake up readers and
// writers that the channel state has changed, in particular, an
// unsatisfied I/O request may have been performed, causing a fibre
// on the channel to now go onto an active set and be available for
// resumption.
//
// It is important to note that external devices such as a clock
// MUST prevent this by holding an endpoint to the channel.
// In particular a clock, for example, is considered active even if
// it is sleeping waiting for an alarm to go off or a request to come in.
// A clock holds a request channel endpoint, even when there are no
// clients.

struct async_channel_t : concurrent_channel_t {
  ::std::condition_variable cv;
  ::std::mutex cv_lock;

  void signal() override { cv.notify_all(); }

  size_t size() const override { return sizeof(async_channel_t); }

  async_channel_t () {}

  void read(void **target, fibre_t **pcurrent)  override {
    fibre_t *current = *pcurrent;
    ++current->process->async_count;
    lock();
    fibre_t *w = st_pop_writer();
    if(w) {
      ++refcnt;
      unlock();
      *target =
        *w->svc_req->io_request.pdata; // transfer data
      w->process->push(w); // onto active list
    }
    else {
      if(refcnt == 1) {
::std::cerr << "Async channel " << this << " read detects refcnt 1" << ::std::endl;
        *pcurrent = current->process->pop(); // reset current from active list
        delete_concrete_object(current,current->process->process_allocator);
        return; // to prevent signalling a deleted channel
      } else {
        --refcnt;
        st_push_reader(current);
        unlock();
        *pcurrent = current->process->pop(); // active list
      }
    }
    signal();
  }

  void write(void **source, fibre_t **pcurrent) override  {
    fibre_t *current = *pcurrent;
    ++current->process->async_count;
    lock();
    fibre_t *r = st_pop_reader();
    if(r) {
      ++refcnt;
      unlock();
      *r->svc_req->io_request.pdata = *source;

      if(r->process == current->process) {
        current->process->push(current); // current is writer, pushed onto active list
        *pcurrent = r; // make reader current
      }
      else {
        r->process->push(r);
      }
    }
    else {
      if(refcnt == 1) {
::std::cerr << "Async channel " << this << " write detects refcnt 1" << ::std::endl;
        delete_concrete_object(current,current->process->process_allocator);
        *pcurrent = current->process->pop(); // reset current from active list
        return; // to prevent signalling a deleted channel
      } else {
        --refcnt;
        st_push_writer(current); // i/o fail: push current onto channel
        unlock();
        *pcurrent = current->process->pop(); // reset current from active list
      }
    }
    signal();
  }


};

chan_epref_t make_async_channel(alloc_ref_t a) {
  return acquire_channel(a, new async_channel_t);
}
#endif

share/lib/rtl/rt/system.hpp

#ifndef SYSTEM
#define SYSTEM
#include "allocator.hpp"
#include "channel.hpp"

// For use by the kernel CSP system
struct system_t {
  alloc_ref_t system_allocator;
  system_t (alloc_ref_t a) : system_allocator(a) {}

  void connect_sequential (chan_epref_t *left, chan_epref_t *right);
  void connect_concurrent (chan_epref_t *left, chan_epref_t *right);
  void connect_async(chan_epref_t *left, chan_epref_t *right);

};
#endif

+ 8 CSP Processes

share/lib/rtl/rt/csp_process.hpp

#ifndef CSP_PROCESS
#define CSP_PROCESS
#include "system.hpp"

// active set
struct csp_process_t {
  ::std::atomic_size_t refcnt;
  system_t *system;
  alloc_ref_t process_allocator;

  fibre_t *active;
  ::std::atomic_flag lock; // this one is a spin lock for sync ops

  // an async service which pushes a fibre onto the active
  // set also decrements the active count and must 
  // signal this condition variable to wake up all schedulers
  // so they notice the active set is now populated

  ::std::atomic_size_t async_count;
  ::std::mutex async_lock; // mutex lock for async ops
  ::std::condition_variable async_wake;

  void async_complete() { 
    //::std::cerr << "Active set: async complete" << ::std::endl;
    --async_count; async_wake.notify_all(); 
  }

  ::std::atomic_size_t running_thread_count;

  csp_process_t(system_t *s, alloc_ref_t a) : 
    system(s), process_allocator(a),
    refcnt(1), active(nullptr), async_count(0), lock(false), running_thread_count(0) 
  { 
    // ::std::cerr << "New process" << ::std::endl;
  }

  csp_process_t *share() { ++refcnt; return this; }
  void forget() { 
    --refcnt; 
    if(!atomic_load(&refcnt)) 
      delete_concrete_object(this,system->system_allocator); 
  }

  // push a new active fibre onto active list
  void push(fibre_t *fresh) { 
// ::std::cerr << "Active set push " << fresh << ::std::endl;
    while(lock.test_and_set(::std::memory_order_acquire)); // spin
    fresh->next = active; 
    active = fresh; 
    lock.clear(::std::memory_order_release); // release lock
  }
  // pop an active fibre off the active list
  fibre_t *pop() {
// ::std::cerr << "Active set pop .. " << ::std::endl;
    while(lock.test_and_set(::std::memory_order_acquire)); // spin
    fibre_t *tmp = active;
    if(tmp)active = tmp->next;
    lock.clear(::std::memory_order_release); // release lock
// ::std::cerr << "Active set popped .. " << tmp << ::std::endl;
    return tmp;
  }
};
#endif

+ 9 CSP threads

share/lib/rtl/rt/csp_thread.hpp

#ifndef CSP_THREAD
#define CSP_THREAD
#include <thread>
// csp_thread_t4.hpp
#include "system.hpp"
#include "svc.hpp"
#include "csp_process.hpp"

// scheduler
struct csp_thread_t {
  csp_process_t *process;  // chain of fibres ready to run

  fibre_t *current; // currently running fibre, nullptr if none
  ~csp_thread_t() { process->forget(); }

  csp_thread_t(csp_process_t *a) : current(nullptr), process(a) {}

  void sync_run(con_t *);
  void do_read(io_request_t *req);
  void do_write(io_request_t *req);
  void do_spawn_fibre(spawn_fibre_request_t *req);
  void do_spawn_fibre_deferred(spawn_fibre_request_t *req);
  void do_spawn_process(spawn_process_request_t *req);
  void do_spawn_cothread(spawn_fibre_request_t *req);
};

extern void csp_run(system_t *system, alloc_ref_t process_allocator, con_t *init) {
::std::cerr << "csp_run start" << ::std::endl;
  csp_thread_t (new(system->system_allocator) csp_process_t(system, process_allocator)).sync_run(init);
::std::cerr << "csp_run over " << ::std::endl;
}

// scheduler subroutine runs until there is no work to do
void csp_thread_t::sync_run(con_t *cc) {
  current = new(process->process_allocator) fibre_t(cc, process);
  cc->fibre = current;
  ++process->running_thread_count;
retry:
  while(current) // while there's work to do 
  {
    current->svc_req = nullptr; // null out service request
    svc_req_t *svc_req = current->run_fibre();
    if(svc_req)  // fibre issued service request
      switch (svc_req->get_code()) 
      {
        case read_request_code_e: 
          do_read(&(svc_req->io_request));
          break;
        case write_request_code_e:  
          do_write(&(svc_req->io_request));
          break;
        case spawn_fibre_request_code_e:  
          do_spawn_fibre(&(svc_req->spawn_fibre_request));
          break;
        case spawn_fibre_deferred_request_code_e:  
          do_spawn_fibre_deferred(&(svc_req->spawn_fibre_request));
          break;
        case spawn_process_request_code_e:  
          do_spawn_process(&(svc_req->spawn_process_request));
          break;
        case spawn_cothread_request_code_e:  
          do_spawn_cothread(&(svc_req->spawn_fibre_request));
          break;
        default:
          assert(false);
      }
    else // the fibre returned without issuing a request so should be dead
    {
      assert(!current->cc); // check it's adead fibre
      //::std::cerr << "csp_thread: null continuation in fibre, deleting fibre " << current << ::std::endl;
      auto old_current = current;
      current = nullptr;
      delete_concrete_object(old_current,old_current->process->process_allocator);
      current = process->pop();      // get more work
      //::std::cerr << "csp_thread: new current fibre " << current << ::std::endl;
    }
  }

  // decrement running thread count
  process->running_thread_count--;

  // Async events can reload the active set, but they do NOT change current
rewait:
  // if the async count > 0 we're waiting for the async op to complete
  // if the running thread count > 0 we're waiting for other threads to stall
//  //::std::cerr << "Scheduler out of fibres: async count = " << process->async_count.load() << ::std::endl;
  if(process->async_count.load() > 0 || process->running_thread_count.load() > 0) {
    // delay
    {
////::std::cerr << "Scheduler sleeping (inf)" << ::std::endl;
      ::std::unique_lock<::std::mutex> lk(process->async_lock);
      process->async_wake.wait_for(lk,::std::chrono::milliseconds(100000));
    } // lock released now
    current = process->pop();      // get more work
    if(current) {
      process->running_thread_count++;
      goto retry;
    }
    goto rewait;
  }

//  //::std::cerr << "Scheduler out of work, returning" << ::std::endl;
}


void csp_thread_t::do_read(io_request_t *req) {
  req->chan->get()->channel->read(current->svc_req->io_request.pdata, &current);
}

void csp_thread_t::do_write(io_request_t *req) {
 req->chan->get()->channel->write(current->svc_req->io_request.pdata, &current);
}


void csp_thread_t::do_spawn_fibre(spawn_fibre_request_t *req) {
// //::std::cerr << "do spawn" << ::std::endl;
  current->svc_req=nullptr;
  process->push(current);
  con_t *cc= req->tospawn;
  current = new(process->process_allocator) fibre_t(cc, process);
  cc->fibre = current;
// //::std::cerr << "spawned " << current << ::std::endl;
}

void csp_thread_t::do_spawn_fibre_deferred(spawn_fibre_request_t *req) {
// //::std::cerr << "do spawn deferred" << ::std::endl;
  current->svc_req=nullptr;
  con_t *init = req->tospawn;
  fibre_t *d = new(process->process_allocator) fibre_t(init, process);
  init->fibre = d;
  process->push(d);
// //::std::cerr << "spawn deferred " << d << ::std::endl;
}

static void spawn(csp_process_t *pa, con_t *cc) {
  csp_thread_t(pa).sync_run(cc);
}
void csp_thread_t::do_spawn_process(spawn_process_request_t *req) {
  csp_process_t *process = new(process->system->system_allocator) csp_process_t(process->system, req->process_allocator);
  ::std::thread(spawn,process,req->tospawn).detach();
}

void csp_thread_t::do_spawn_cothread(spawn_fibre_request_t *req) {
  current->process->refcnt++;
  ::std::thread(spawn,current->process,req->tospawn).detach();
}
#endif

+ 10 CSP: the lot

share/lib/rtl/rt/csp.hpp

#ifndef CSP
#define CSP
#include <typeinfo>

// forward decls
struct csp_clock_t;
struct fibre_t;
struct channel_t;
struct csp_process_t;
struct con_t;
struct allocator_t;
struct alloc_ref_t;
struct system_t;
struct channel_endpoint_t;
struct chan_epref_t;

// the csp system
#include "allocator.hpp"
#include "malloc_free.hpp"
//#include "utility_allocators.hpp"
//#include "freelist.hpp"
#include "system_allocator.hpp"
#include "system.hpp"

#include "con.hpp"
#include "fibre.hpp"

#include "csp_process.hpp"
#include "svc.hpp"

con_t *coroutine_t::return_control()
{
//::std::cerr << "Coroutine " << this << " returns control" << ::std::endl;
  delete_csp_polymorphic_object(this,fibre->process->process_allocator);
  return nullptr;
}

con_t *subroutine_t::return_control() {
  auto tmp = caller;
  delete_csp_polymorphic_object(this,fibre->process->process_allocator);
  return tmp;
}


// resolve circular reference
fibre_t::~fibre_t()
{
  //::std::cerr << "Fibre destructor " << this << ::std::endl;
  while(cc) cc= cc->return_control(); 
}

#include "channel.hpp"
#include "csp_thread.hpp"
#include "sequential_channel.hpp"
#include "concurrent_channel.hpp"
#include "async_channel.hpp"
//#include "clock.hpp"

#define CSP_SUBRETURN return return_control();
#define CSP_COSUICIDE return return_control();

#define CSP_CALLDEF_START \
  con_t *call(con_t *caller_a

#define CSP_CALLDEF_MID ){\
  caller = caller_a;\
  pc = 0;

#define CSP_CALLDEF_END \
  return this;\
}

#define CSP_CODEF_START \
  con_t *setup(

#define CSP_CODEF_MID ){\
  pc = 0;

#define CSP_CODEF_END \
  return this;\
}
#define CSP_RESUME_START\
  con_t *resume() override {\
  switch(pc++){\
  case 0:

#define CSP_RESUME_END\
  default: assert(false);\
  }}

#define CSP_SIZE \
  size_t size() const override { return sizeof(*this); } 

#define SVC_READ_REQ(xpreq,xpchan,xpdata)\
  (xpreq)->svc_code = read_request_code_e;\
  (xpreq)->pdata = (void**)xpdata;\
  (xpreq)->chan = xpchan;

#define SVC_WRITE_REQ(xpreq,xpchan,xpdata)\
  (xpreq)->svc_code = write_request_code_e;\
  (xpreq)->pdata = (void**)xpdata;\
  (xpreq)->chan = xpchan;

#define SVC_ASYNC_WRITE_REQ(xpreq,xpchan,xpdata)\
  (xpreq)->svc_code = async_write_request_code_e;\
  (xpreq)->pdata = (void**)xpdata;\
  (xpreq)->chan = xpchan;

#define SVC_SPAWN_FIBRE_REQ(xpreq,xcont)\
  (xpreq)->svc_code = spawn_fibre_request_code_e;\
  (xpreq)->tospawn = xcont;

#define SVC_SPAWN_FIBRE_DEFERRED_REQ(xpreq,xcont)\
  (xpreq)->svc_code = spawn_fibre_deferred_request_code_e;\
  (xpreq)->tospawn = xcont;

#define SVC_SPAWN_PTHREAD_REQ(xprec,xcont)\
  (xpreq)->spawn_pthread_request_code_e;\
  (xpreq)->tospawn = xcont;

#define SVC(preq)\
  fibre->svc_req = (svc_req_t*)(void*)preq;\
  return this;

#define CSP_GOTO(caseno)\
  pc = caseno;\
  return this;

/*
#define CSP_CALL_DIRECT0(procedure)\
  return (new procedure(global))->call(this);

#define CSP_CALL_DIRECT1(procedure,arg)\
  return (new procedure(global))->call(this,arg);

#define CSP_CALL_DIRECT2(procedure,arg1,arg2)\
  return (new procedure(global))->call(this,arg1,arg2);


#define CSP_CALL_DIRECT3(procedure,arg1,arg2,arg3)\
  return (new procedure(global))->call(this,arg1,arg2,arg3);
**/
#endif

+ 11 Simple test case 1

$PWD/csp_01.cxx

#include <iostream>
#include <cassert>
#include <list>


#include "csp.hpp"
#include "statistics_allocator.hpp"

//#include "chips.hpp"

// TEST CASE

struct hello : coroutine_t {

  CSP_RESUME_START
    ::std::cerr << "Hello World" << ::std::endl;
    CSP_COSUICIDE
  CSP_RESUME_END
  CSP_SIZE
};


struct producer : coroutine_t {
  ::std::list<int> *plst;
  ::std::list<int>::iterator it;
  chan_epref_t out;
  union {
    void *iodata;
    int value;
  };
  io_request_t w_req;

  ~producer() { }

  CSP_CODEF_START
    ::std::list<int> *plst_a,
    chan_epref_t outchan_a
  CSP_CODEF_MID
    plst = plst_a;
    out = outchan_a;
  CSP_CODEF_END

  CSP_RESUME_START
    it = plst->begin();
    SVC_WRITE_REQ(&w_req,&out,&iodata);

  case 1:
    if(it == plst->end()) { 
      CSP_COSUICIDE
    }
    value = *it++;
    pc = 1;
    SVC(&w_req)

  CSP_RESUME_END
  CSP_SIZE
};

struct consumer: coroutine_t {
  ::std::list<int> *plst;
  union {
    void *iodata;
    int value;
  };
  io_request_t r_req;
  chan_epref_t inp;

  ~consumer() {}

  CSP_CODEF_START
    ::std::list<int> *plst_a,
    chan_epref_t inchan_a
  CSP_CODEF_MID
    plst = plst_a;
    inp = inchan_a;
  CSP_CODEF_END

  CSP_RESUME_START  
    SVC_READ_REQ(&r_req,&inp,&iodata)

  case 1:
    SVC(&r_req)

  case 2:
    ::std::cerr << "Consumer gets " << value << ::std::endl;
    plst->push_back(value);
    CSP_GOTO(1)

  CSP_RESUME_END
  CSP_SIZE
};

struct square : subroutine_t {
  int inp;
  int *pout;

  square(fibre_t *f) : subroutine_t(f) {}
 
  CSP_CALLDEF_START,
    int *pout_a,
    int inp_a
  CSP_CALLDEF_MID
    pout = pout_a;
    inp = inp_a;
  CSP_CALLDEF_END

  CSP_RESUME_START
    *pout = inp * inp;
    {
      con_t *tmp = caller;
      delete_concrete_object(this, fibre->process->process_allocator);
      return tmp;
    }
  CSP_RESUME_END
  CSP_SIZE
};


struct transducer: coroutine_t {
  union {
    void *iodata;
    int value;
  };
  io_request_t r_req;
  io_request_t w_req;
  chan_epref_t inp;
  chan_epref_t out;

  ~transducer() {}

  CSP_CODEF_START
    chan_epref_t inchan_a,
    chan_epref_t outchan_a
  CSP_CODEF_MID 
    inp = inchan_a;
    out = outchan_a;
  CSP_CODEF_END
  
  CSP_RESUME_START
    SVC_READ_REQ(&r_req,&inp,&iodata)
    SVC_WRITE_REQ(&w_req,&out,&iodata)

  case 1:
    SVC(&r_req)

  case 2:
    //CSP_CALL_DIRECT2(square,&value,value)
    return (new(fibre->process->process_allocator) square(fibre))->call(this,&value,value);

  case 3:
    pc = 1;
    SVC(&w_req)

  CSP_RESUME_END
  CSP_SIZE
};


struct init: coroutine_t {
  ::std::list<int> *inlst;
  ::std::list<int> *outlst;
  spawn_fibre_request_t spawn_req;
  chan_epref_t ch1out;
  chan_epref_t ch1inp;
  chan_epref_t ch2out;
  chan_epref_t ch2inp;
  chan_epref_t clock_connection;
  io_request_t clock_req;
  double waituntil;
  double *pwaituntil;
  ::std::shared_ptr<csp_clock_t> clock;

  ~init() {}

  // store parameters in
  CSP_CODEF_START
    ::std::list<int> *lin,
    ::std::list<int> *lout
  CSP_CODEF_MID
    inlst = lin;
    outlst = lout;
  CSP_CODEF_END

  CSP_RESUME_START
    ch1out = make_concurrent_channel(fibre->process->system->system_allocator);
    ch1inp = ch1out->dup(); 
    ch2out = make_concurrent_channel(fibre->process->system->system_allocator);
    ch2inp = ch2out->dup();
 
    SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) producer)->setup(inlst, ch1out))
    SVC(&spawn_req)
 
  case 1:
    SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) transducer)->setup(ch1inp, ch2out))
    SVC(&spawn_req)
 
  case 2:
    SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) consumer)->setup(outlst,ch2inp))
    SVC(&spawn_req)
 
  case 3:
    SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) hello))
    SVC(&spawn_req)
  case 4:
/*
    { 
      clock = make_clock(fibre->process->system);
      clock->start();
      //::std::cerr << "Clock started, time is " << clock->now() << ::std::endl;
      clock_connection = clock->connect();
      //::std::cerr << "Got connection" << ::std::endl;
      {
        double rightnow = clock->now();
        waituntil = rightnow + 12.10;
        //::std::cerr << ::std::fixed << "Wait until" << waituntil << " for " << waituntil - rightnow << " seconds" << ::std::endl;
      }
      //::std::cerr << "Alarm time " << waituntil << ", stored at " << &waituntil
      //  << "=" << pwaituntil << " which is stored at " << &pwaituntil << ::std::endl;
      pwaituntil = &waituntil;
      SVC_WRITE_REQ(&clock_req,&clock_connection,&pwaituntil);
      ::std::cerr<<"****** INIT Sleeping ********" << ::std::endl;
    }
// too early to signal
// need different kind of channel w. cv
    SVC(&clock_req)
// too late to signal
  case 5:
    // if this doesn't print, we didn't resume after the sleep correctly
**/
    ::std::cerr<<"****** INIT Sleep Over ********" << ::std::endl;
    CSP_COSUICIDE


  CSP_RESUME_END
  CSP_SIZE
}; // init class

#include <iostream>

int main() {
  // create the input list
  ::std::list<int> inlst;
  for (auto i = 0; i < 20; ++i) inlst.push_back(i);

  // empty output list
  ::std::list<int> outlst;

  {
    ::std::vector<mem_req_t> reqs;
    reqs.push_back(mem_req_t {sizeof(hello),50});
    reqs.push_back(mem_req_t {sizeof(producer),50});
    reqs.push_back(mem_req_t {sizeof(transducer),50});
    reqs.push_back(mem_req_t {sizeof(consumer),50});
    reqs.push_back(mem_req_t {sizeof(init),50});
    reqs.push_back(mem_req_t {sizeof(square),50});
    reqs.push_back(mem_req_t {256,50});
    reqs.push_back(mem_req_t {512,50});

    ::std::cout << "N reqs = " << reqs.size() << ::std::endl;
    auto fixed_reqs = fixup(reqs); 
    ::std::cout << "fixed up N reqs = " << fixed_reqs.size() << ::std::endl;
    for(auto req : fixed_reqs) ::std::cout << req.block_size << " x " << req.n_blocks << ::std::endl;
    // bootstrap allocator
    alloc_ref_t malloc_free = malloc_free_allocator_t::create(); // parent C++ allocator
/*
    //alloc_ref_t malloc_free_debugger = new(malloc_free) debugging_allocator_t(malloc_free, malloc_free, "Malloc");


    // system allocator
    alloc_ref_t system_allocator_delegate = new(malloc_free_debugger) system_allocator_t(malloc_free_debugger,malloc_free_debugger, reqs);
    alloc_ref_t system_allocator_debugger = new(malloc_free_debugger) debugging_allocator_t(malloc_free_debugger, system_allocator_delegate, "Sys");
    alloc_ref_t system_allocator = new(malloc_free_debugger) statistics_allocator_t( malloc_free_debugger, system_allocator_debugger, "Sysalloc.stats.txt");


    // initial process will use the system allocator
    alloc_ref_t process_allocator = system_allocator;
**/
    alloc_ref_t x = system_allocator_t::create(malloc_free, fixed_reqs);
    alloc_ref_t system_allocator = statistics_allocator_t::create(malloc_free, x, "xstats.txt");
    alloc_ref_t process_allocator = system_allocator;

    // creates the clock too
    system_t *system = new system_t(system_allocator);
::std::cout << "STARTING" << ::std::endl;
    csp_run(system, process_allocator, (new(process_allocator) init)->setup(&inlst, &outlst));
::std::cerr << "RUN COMPLETE" << ::std::endl;
    delete system;
  }

  // the result is now in the outlist so print it
  ::std::cerr<< "main: +++++++++ List of squares:" << ::std::endl;
  for(auto v : outlst) ::std::cerr << v << ::std::endl;
  ::std::cerr<< "main: +++++++++ Done" << ::std::endl;
}