1 Continuations
share/lib/rtl/rt/con.hpp
#ifndef CON #define CON // continuation #include <cstddef> using size_t = ::std::size_t; struct fibre_t; struct con_t { fibre_t *fibre; int pc; // program counter con_t () : pc(0), fibre(nullptr) {} virtual con_t *return_control()=0; virtual con_t *resume()=0; virtual size_t size()const=0; virtual ~con_t(){} }; // coroutine struct coroutine_t : con_t { con_t *return_control(); // deletes object and returns nullptr }; // top level subroutine struct subroutine_t : con_t { subroutine_t(fibre_t *f) { fibre = f; } con_t *caller; // caller continuation con_t *return_control(); // deletes object and returns caller }; // NOTE: a subroutine must not delete itself if it returns // a nested procedure which binds to its local variable frame struct curry_subroutine_t : con_t { con_t *caller; // caller continuation curry_subroutine_t (fibre_t *f) { fibre = f; } con_t *return_control() { auto tmp = caller; caller = nullptr; return tmp; } }; #endif
2 Fibres
share/lib/rtl/rt/fibre.hpp
#ifndef FIBRE #define FIBRE // fibre3.hpp // fibre #include "con.hpp" struct csp_process_t; union svc_req_t; struct fibre_t { con_t *cc; fibre_t *next; csp_process_t *process; svc_req_t *svc_req; // request // default DEAD //fibre_t() : cc(nullptr), next(nullptr), process(nullptr) {} // construct from continuation fibre_t(con_t *ccin, csp_process_t *owned_by) : cc(ccin), next (nullptr), process(owned_by), svc_req(nullptr) { ccin->fibre=this; } // immobile fibre_t(fibre_t const&)=delete; fibre_t& operator=(fibre_t const&)=delete; // destructor deletes any remaining continuations in spaghetti stack ~fibre_t(); // defined in csp.hpp to resolve circular reference // run until either fibre issues a service request or dies svc_req_t *run_fibre() { while(cc) { cc=cc->resume(); if(cc && svc_req) return svc_req; } return nullptr; } }; #endif
3 Channel Design
A channel is a lightweight synchronisation device which allows fibres to perform two operations: read and write. When a read is performed by a channel owning fibre, if there is a writer on the channel, data is transfered, the writer is removed from the channel, and both the reader and writer become active.
If there is no writer on the channel, the reader is suspended and added to the channel: the channel now owns the fibre.
Write operation is dual.
If no one owns a channel, the channel is deleted along with all the fibres it owns. Similarly, if no one owns a fibre, it is deleted, and shared ownership of the any channels the fibre has will be relinquished. Together this can lead to a recursive deletion of many channels and fibres.
For example in a pipeline, if one fibre in the middle decides to quit, it is deleted, and now the components on either side of it can no longer write or read, so they will be deleted as well, and the effect cascades and destroys the whole pipeline.
To make this work, we have to use a double reference counting scheme. Objects called channel endpoints reference count the channel they're endpoints for. In turn, each endpoint is accessed by a smart pointer which reference counts it, and can be passed around the continuations of the fibre. The fibre itself must maintain an array of all the endpoints it owns, the registry.
When a fibre suspends on an I/O operation this is what happens: first, the fibre is removed from the scheduler, and added to the channel. We use a linked list in the channel. The reference count of the fibre doesn't change at this point, since we simply moved ownership to the channel.
Next, we have to decrement the reference count of all the registered endpoints, since none of them can do any work, since the owning fibre is no longer active. When the fibre is resumed all these reference counts must be incremented again.
When the reference count of a channel endpoint would be decremented to zero, the endpoint is deleted, which causes the reference count of the channel it refers to to be decremented.
And clearly, when the channel reference count goes to zero, it is deleted too. When the channel is deleted, all the fibres owned by the channel are also deleted, since none of them can become active again, since they're waiting on I/O on the channel, which no longer exists.
Deleting a fibre requires deleting all the continuations in its stack. As this happens, references to channel endpoints are deleted, decrementing the endpoint reference counts, until only the registry of the fibre remains. When the references in the registry are deleted all the endpoints will be deleted, which will decrement, and possibly delete, the channel referred to.
The implementation is not quite trivial, since it is recursive and cycles must be broken to prevent an infinite recursion. This is done by marking the channel when deletion is in progress, which stops attempts to delete the owned fibres (since they're already being deleted). In fact, we use the channel refcount itself as the mark: if the refcount is zero, deletion of the channel is in progress.
Note that channels are shared between fibres, but an endpoint msut be exclusively owned by a single fibre. The smart pointer to the channel endpoints can be copied, assigned, and moved around but only within continuations of the fibre.
When a fibre is spawned it must be passed references to channel endpoints, which is done by constructing its registry and passing that. It is vital the spawner delete all these endpoints it is not allowed to own. This should be done as soon as possible because whilst it owns these endpoints, the referred to channel is still considered reachable. This must be done before the next I/O operation because these endpoints are not registered in the creators registry. This means if it is suspended, the reference counts of the end points will not be decremented as required.
One should note this is not trivial. When a fibre wants to spawn several others and connect them with channels, it must hold the appropriate endpoint array for each fibre it will spawn until they're done. On the other hand since the first spawned fibre might start immediately and immediately do I/O, the endpoints the other ends of the channels must be held long enough to ensure that fibre does not immediately suicide.
Another point to note is that a channel must have a reference count of at least 2 to be useful, one for each of two endpoints in different active fibres.
Finally note, any number of fibres can subscribe to a channel. Channels are untyped can can operate in any direction. Similarly, endpoints are not intrinsically input or output. A channel is, of course, doomed if there is only one subscriber.
share/lib/rtl/rt/channel.hpp
#ifndef CHANNEL #define CHANNEL #include <memory> #include <cstdint> #include <iostream> using uintptr_t = ::std::uintptr_t; #include "fibre.hpp" #include "allocator.hpp" struct channel_endpoint_t; // CHANNEL ABSTRACTION // low bit fiddling routines inline static bool get_lowbit(void *p) { return (uintptr_t)p & (uintptr_t)1u; } inline static void *clear_lowbit(void *p) { return (void*)((uintptr_t)p & ~(uintptr_t)1u); } inline static void *set_lowbit(void *p) { return (void*)((uintptr_t)p | (uintptr_t)1u); } // channel struct channel_t { ::std::atomic_size_t refcnt; fibre_t *top; channel_t () : top (nullptr), refcnt(0) {} virtual ~channel_t(){ //::std::cerr << "channel " << this << " destructor" << ::std::endl; } // immobile object channel_t(channel_t const&)=delete; channel_t& operator= (channel_t const&)=delete; // required for the polymorphic deleter virtual size_t size() const = 0; virtual void signal()=0; // channel read operation virtual void read(void **target, fibre_t **pcurrent)=0; // channel write operation virtual void write(void **source, fibre_t **pcurrent)=0; protected: // basic push and pop operations, not thread safe void st_push_reader(fibre_t *r) { r->next = top; top = r; } void st_push_writer(fibre_t *w) { w->next = top; top = (fibre_t*)set_lowbit(w); } fibre_t *st_pop_reader() { fibre_t *tmp = top; if(!tmp || get_lowbit(tmp))return nullptr; top = top -> next; return tmp; // lowbit is clear, its a reader } fibre_t *st_pop_writer() { fibre_t *tmp = top; if(!tmp || !get_lowbit(tmp)) return nullptr; tmp = (fibre_t*)clear_lowbit(tmp); // lowbit is set for writer top = tmp -> next; return tmp; } }; struct chan_epref_t { channel_endpoint_t *endpoint; chan_epref_t(channel_endpoint_t *p) : endpoint(p) {} // endpoint ctor sets refcnt to 1 initially channel_endpoint_t *operator->()const { return endpoint; } channel_endpoint_t *get() const { return endpoint; } chan_epref_t() : endpoint(nullptr) {} // rvalue move chan_epref_t(chan_epref_t &&p) { if(p.endpoint) { endpoint = p.endpoint; p.endpoint = nullptr; } else endpoint = nullptr; } // lvalue copy chan_epref_t(chan_epref_t &p); // rvalue assign void operator= (chan_epref_t &&p) { if (&p!=this) { // ignore self assign this->~chan_epref_t(); // destroy target new(this) chan_epref_t(::std::move(p)); // move source to target } } // rass // lvalue assign void operator= (chan_epref_t &p) { if(&p!=this) { // ignore self assign this->~chan_epref_t(); // destroy target new(this) chan_epref_t(p); // copy source to target } } // lass // destructor // uses allocator passed to endpoint on construction to delete it ~chan_epref_t(); }; // the allocator used to construct the endpoint // must be passed to it so it can be used to delete it // it MUST be the same allocator used to construct the channel // try to FIXME so it is .. this requires storing the allocator in // the channel OR ensuring all channels and endpoints are constructed // using the system allocator struct channel_endpoint_t { size_t refcnt; channel_t *channel; alloc_ref_t allocator; channel_endpoint_t(channel_t *p, alloc_ref_t a) : allocator(a), channel(p), refcnt(1) { ++channel->refcnt; } // immobile object channel_endpoint_t(channel_endpoint_t const&)=delete; channel_endpoint_t& operator= (channel_endpoint_t const&)=delete; // create a duplicate of the current endpoint refering // to the same channel. Returns a chan_epref_t; a shared // pointer to the new endpoint. Incredrummoyne@corleonemarinas.comments the counter // of endpoints in the channel. // note, C++ must construct a single object containing // both the reference count and the channel endpoint. chan_epref_t dup() { return chan_epref_t(new(allocator) channel_endpoint_t(channel, allocator)); } ~channel_endpoint_t () { //::std::cerr << "Channel endpoint " << this << " destructor, channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; switch (channel->refcnt.load()) { case 0: break; case 1: delete_channel(); break; default: --channel->refcnt; break; } } void delete_channel() { //::std::cerr << "Delete channel " << channel << ", refcnt " << channel->refcnt.load() << ::std::endl; fibre_t *top = channel->top; channel->top = nullptr; channel->refcnt.store(0); while (top) { fibre_t *f = (fibre_t*)clear_lowbit(top); fibre_t *tmp = f->next; // ::std::cerr << "Channel " << channel << " Deletes fibre " << f << ", next=" << tmp << ::std::endl; delete_concrete_object(f, allocator); top = tmp; } //::std::cerr << "Deleting channel " << channel << " now" << ::std::endl; delete_csp_polymorphic_object(channel, allocator); } }; // lvalue copy chan_epref_t::chan_epref_t(chan_epref_t &p) { if(p.endpoint) { endpoint = p.endpoint; p.endpoint->refcnt++; } else endpoint = nullptr; } // destructor // uses allocator passed to endpoint on construction to delete it chan_epref_t::~chan_epref_t() { if (endpoint) { if(endpoint->refcnt == 1) { //::std::cerr << "Endpoint ref " << this << " deletes endpoint " << endpoint << ::std::endl; delete_concrete_object(endpoint, endpoint->allocator); } else --endpoint->refcnt; } } // dtor chan_epref_t acquire_channel(alloc_ref_t a, channel_t *p) { return chan_epref_t(new(a) channel_endpoint_t(p,a)); } #endif
4 Service Requests
share/lib/rtl/rt/svc.hpp
#ifndef SVC_HPP #define SVC_HPP #include "allocator.hpp" struct chan_epref_t; struct con_t; // service requests // // Request codes enum svc_code_t { read_request_code_e, write_request_code_e, spawn_fibre_request_code_e, spawn_fibre_deferred_request_code_e, spawn_process_request_code_e, spawn_cothread_request_code_e }; // synchronous I/O requests struct io_request_t { svc_code_t svc_code; chan_epref_t *chan; void **pdata; }; // fibre and cothread spawn requests struct spawn_fibre_request_t { svc_code_t svc_code; con_t *tospawn; }; // fibre and cothread spawn requests struct spawn_process_request_t { svc_code_t svc_code; con_t *tospawn; alloc_ref_t process_allocator; }; // unified service request type (only used for casts) union svc_req_t { io_request_t io_request; spawn_fibre_request_t spawn_fibre_request; spawn_process_request_t spawn_process_request; svc_code_t get_code () const { return io_request.svc_code; } }; #endif
5 Sequential Channel
share/lib/rtl/rt/sequential_channel.hpp
#ifndef SEQUENTIAL_CHANNEL #define SEQUENTIAL_CHANNEL #include "channel.hpp" #include "svc.hpp" #include "csp_process.hpp" // SINGLE THREADED CHANNEL (no locking) // channel struct sequential_channel_t : channel_t { sequential_channel_t () : channel_t() {} size_t size() const override { return sizeof(sequential_channel_t); } void signal() override {} void read(void **target, fibre_t **pcurrent) override { //::std::cerr << *pcurrent << " Sequential Channel read begins " << ::std::endl; fibre_t *current = *pcurrent; fibre_t *w = st_pop_writer(); //::std::cerr << *pcurrent << " Sequential Channel read finds writer " << w << ::std::endl; if(w) { ++refcnt; *target = *w->svc_req->io_request.pdata; // transfer data w->process->push(w); // onto active list } else { if(refcnt == 1) { *pcurrent = current->process->pop(); // active list //::std::cerr << "channel read " << this << " deletes fibre " << current << ::std::endl; delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; st_push_reader(current); *pcurrent = current->process->pop(); // active list } } } void write(void **source, fibre_t **pcurrent) override { //::std::cerr << *pcurrent << " Sequential Channel write begins " << ::std::endl; fibre_t *current = *pcurrent; fibre_t *r = st_pop_reader(); //::std::cerr << *pcurrent << " Sequential Channel write finds reader " << r << ::std::endl; if(r) { ++refcnt; *r->svc_req->io_request.pdata = *source; if(r->process == current->process) { current->process->push(current); // current is writer, pushed onto active list *pcurrent = r; // make reader current } else { r->process->push(r); } } else { if(refcnt == 1) { *pcurrent = current->process->pop(); // reset current from active list //::std::cerr << "channel write " << this << " deletes fibre " << current << ::std::endl; delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; st_push_writer(current); // i/o fail: push current onto channel *pcurrent = current->process->pop(); // reset current from active list } } } }; chan_epref_t make_sequential_channel(alloc_ref_t a) { return acquire_channel(a, new(a) sequential_channel_t); } void system_t::connect_sequential (chan_epref_t *left, chan_epref_t *right) { auto chleft= make_sequential_channel(system_allocator); auto chright= chleft ->dup(); *left = chleft; *right= chright; } #endif
6 Concurrent Channel
share/lib/rtl/rt/concurrent_channel.hpp
#ifndef CONCURRENT_CHANNEL #define CONCURRENT_CHANNEL #include "allocator.hpp" #include "system.hpp" #include "sequential_channel.hpp" // CONCURRENT CHANNEL (operations are locked, but no async) struct concurrent_channel_t : sequential_channel_t { ::std::atomic_flag lk; void lock() { while(lk.test_and_set(::std::memory_order_acquire)); } void unlock() { lk.clear(::std::memory_order_release); } concurrent_channel_t () : lk(false) {} size_t size() const override { return sizeof(concurrent_channel_t); } void read(void **target, fibre_t **pcurrent) override { //::std::cout << "read on " << this << ::std::endl; fibre_t *current = *pcurrent; lock(); fibre_t *w = st_pop_writer(); if(w) { ++refcnt; unlock(); *target = *w->svc_req->io_request.pdata; // transfer data w->process->push(w); // onto active list } else { if(refcnt == 1) { //::std::cerr << "Concurrent channel read deletes requesting fibre " << current << :: std::endl; *pcurrent = current->process->pop(); // active list delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; //::std::cout<< "do_read: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl; st_push_reader(current); unlock(); *pcurrent = current->process->pop(); // active list } } } void write(void **source, fibre_t **pcurrent) override { //::std::cout << "write on " << this << ::std::endl; fibre_t *current = *pcurrent; lock(); fibre_t *r = st_pop_reader(); if(r) { ++refcnt; unlock(); *r->svc_req->io_request.pdata = *source; if(r->process == current->process) { current->process->push(current); // current is writer, pushed onto active list *pcurrent = r; // make reader current } else { r->process->push(r); } } else { if(refcnt == 1) { //::std::cerr << "Concurrent channel write deletes requesting fibre " << current << :: std::endl; *pcurrent = current->process->pop(); // reset current from active list delete_concrete_object(current,current->process->process_allocator); } else { --refcnt; //::std::cout<< "do_write: fibre " << current << ", set channel "<< this <<" recnt to " << refcnt << ::std::endl; st_push_writer(current); // i/o fail: push current onto channel unlock(); *pcurrent = current->process->pop(); // reset current from active list } } } }; chan_epref_t make_concurrent_channel(alloc_ref_t a) { return acquire_channel(a, new(a) concurrent_channel_t); } void system_t::connect_concurrent (chan_epref_t *left, chan_epref_t *right) { auto chleft= make_concurrent_channel(system_allocator); auto chright= chleft->dup(); *left = chleft; *right= chright; } #endif
7 Asynchronous channel
share/lib/rtl/rt/async_channel.hpp
#ifndef ASYNC_CHANNEL #define ASYNC_CHANNEL #include "allocator.hpp" #include "concurrent_channel.hpp" // ASYNCHRONOUS CHANNEL // // This is like a concurrent channel, except // (a) it actively notifies possibly sleeping subscribers // to the condition variable // that the channel has changed state. // (b) It increments the async count of an active set when a fibre // of that set is pushed onto the channel // (c) decrements the async count when a fibre previously on the channel // is made currect or put onto an active set // // NOTE: when a fibre tries to do I/O on a channel and is the // only holder of an endpoint, the reference count will be 1. // In this case, it must be deleted because the I/O request can never be // satisfied. In turn this would decrement the reference count to 0, // so the channel, and all fibres on it, also need to be deleted. // Fibres on the channel may hold endpoints to the channel, // so if the reference count goes to zero no action is taken, // the channel is going to be deleted anyhow. // // There is no point signaling subscribers to the condition variable, // because the purpose of that is to wake up readers and // writers that the channel state has changed, in particular, an // unsatisfied I/O request may have been performed, causing a fibre // on the channel to now go onto an active set and be available for // resumption. // // It is important to note that external devices such as a clock // MUST prevent this by holding an endpoint to the channel. // In particular a clock, for example, is considered active even if // it is sleeping waiting for an alarm to go off or a request to come in. // A clock holds a request channel endpoint, even when there are no // clients. struct async_channel_t : concurrent_channel_t { ::std::condition_variable cv; ::std::mutex cv_lock; void signal() override { cv.notify_all(); } size_t size() const override { return sizeof(async_channel_t); } async_channel_t () {} void read(void **target, fibre_t **pcurrent) override { fibre_t *current = *pcurrent; ++current->process->async_count; lock(); fibre_t *w = st_pop_writer(); if(w) { ++refcnt; unlock(); *target = *w->svc_req->io_request.pdata; // transfer data w->process->push(w); // onto active list } else { if(refcnt == 1) { ::std::cerr << "Async channel " << this << " read detects refcnt 1" << ::std::endl; *pcurrent = current->process->pop(); // reset current from active list delete_concrete_object(current,current->process->process_allocator); return; // to prevent signalling a deleted channel } else { --refcnt; st_push_reader(current); unlock(); *pcurrent = current->process->pop(); // active list } } signal(); } void write(void **source, fibre_t **pcurrent) override { fibre_t *current = *pcurrent; ++current->process->async_count; lock(); fibre_t *r = st_pop_reader(); if(r) { ++refcnt; unlock(); *r->svc_req->io_request.pdata = *source; if(r->process == current->process) { current->process->push(current); // current is writer, pushed onto active list *pcurrent = r; // make reader current } else { r->process->push(r); } } else { if(refcnt == 1) { ::std::cerr << "Async channel " << this << " write detects refcnt 1" << ::std::endl; delete_concrete_object(current,current->process->process_allocator); *pcurrent = current->process->pop(); // reset current from active list return; // to prevent signalling a deleted channel } else { --refcnt; st_push_writer(current); // i/o fail: push current onto channel unlock(); *pcurrent = current->process->pop(); // reset current from active list } } signal(); } }; chan_epref_t make_async_channel(alloc_ref_t a) { return acquire_channel(a, new async_channel_t); } #endif
share/lib/rtl/rt/system.hpp
#ifndef SYSTEM #define SYSTEM #include "allocator.hpp" #include "channel.hpp" // For use by the kernel CSP system struct system_t { alloc_ref_t system_allocator; system_t (alloc_ref_t a) : system_allocator(a) {} void connect_sequential (chan_epref_t *left, chan_epref_t *right); void connect_concurrent (chan_epref_t *left, chan_epref_t *right); void connect_async(chan_epref_t *left, chan_epref_t *right); }; #endif
8 CSP Processes
share/lib/rtl/rt/csp_process.hpp
#ifndef CSP_PROCESS #define CSP_PROCESS #include "system.hpp" // active set struct csp_process_t { ::std::atomic_size_t refcnt; system_t *system; alloc_ref_t process_allocator; fibre_t *active; ::std::atomic_flag lock; // this one is a spin lock for sync ops // an async service which pushes a fibre onto the active // set also decrements the active count and must // signal this condition variable to wake up all schedulers // so they notice the active set is now populated ::std::atomic_size_t async_count; ::std::mutex async_lock; // mutex lock for async ops ::std::condition_variable async_wake; void async_complete() { //::std::cerr << "Active set: async complete" << ::std::endl; --async_count; async_wake.notify_all(); } ::std::atomic_size_t running_thread_count; csp_process_t(system_t *s, alloc_ref_t a) : system(s), process_allocator(a), refcnt(1), active(nullptr), async_count(0), lock(false), running_thread_count(0) { // ::std::cerr << "New process" << ::std::endl; } csp_process_t *share() { ++refcnt; return this; } void forget() { --refcnt; if(!atomic_load(&refcnt)) delete_concrete_object(this,system->system_allocator); } // push a new active fibre onto active list void push(fibre_t *fresh) { // ::std::cerr << "Active set push " << fresh << ::std::endl; while(lock.test_and_set(::std::memory_order_acquire)); // spin fresh->next = active; active = fresh; lock.clear(::std::memory_order_release); // release lock } // pop an active fibre off the active list fibre_t *pop() { // ::std::cerr << "Active set pop .. " << ::std::endl; while(lock.test_and_set(::std::memory_order_acquire)); // spin fibre_t *tmp = active; if(tmp)active = tmp->next; lock.clear(::std::memory_order_release); // release lock // ::std::cerr << "Active set popped .. " << tmp << ::std::endl; return tmp; } }; #endif
9 CSP threads
share/lib/rtl/rt/csp_thread.hpp
#ifndef CSP_THREAD #define CSP_THREAD #include <thread> // csp_thread_t4.hpp #include "system.hpp" #include "svc.hpp" #include "csp_process.hpp" // scheduler struct csp_thread_t { csp_process_t *process; // chain of fibres ready to run fibre_t *current; // currently running fibre, nullptr if none ~csp_thread_t() { process->forget(); } csp_thread_t(csp_process_t *a) : current(nullptr), process(a) {} void sync_run(con_t *); void do_read(io_request_t *req); void do_write(io_request_t *req); void do_spawn_fibre(spawn_fibre_request_t *req); void do_spawn_fibre_deferred(spawn_fibre_request_t *req); void do_spawn_process(spawn_process_request_t *req); void do_spawn_cothread(spawn_fibre_request_t *req); }; extern void csp_run(system_t *system, alloc_ref_t process_allocator, con_t *init) { ::std::cerr << "csp_run start" << ::std::endl; csp_thread_t (new(system->system_allocator) csp_process_t(system, process_allocator)).sync_run(init); ::std::cerr << "csp_run over " << ::std::endl; } // scheduler subroutine runs until there is no work to do void csp_thread_t::sync_run(con_t *cc) { current = new(process->process_allocator) fibre_t(cc, process); cc->fibre = current; ++process->running_thread_count; retry: while(current) // while there's work to do { current->svc_req = nullptr; // null out service request svc_req_t *svc_req = current->run_fibre(); if(svc_req) // fibre issued service request switch (svc_req->get_code()) { case read_request_code_e: do_read(&(svc_req->io_request)); break; case write_request_code_e: do_write(&(svc_req->io_request)); break; case spawn_fibre_request_code_e: do_spawn_fibre(&(svc_req->spawn_fibre_request)); break; case spawn_fibre_deferred_request_code_e: do_spawn_fibre_deferred(&(svc_req->spawn_fibre_request)); break; case spawn_process_request_code_e: do_spawn_process(&(svc_req->spawn_process_request)); break; case spawn_cothread_request_code_e: do_spawn_cothread(&(svc_req->spawn_fibre_request)); break; default: assert(false); } else // the fibre returned without issuing a request so should be dead { assert(!current->cc); // check it's adead fibre //::std::cerr << "csp_thread: null continuation in fibre, deleting fibre " << current << ::std::endl; auto old_current = current; current = nullptr; delete_concrete_object(old_current,old_current->process->process_allocator); current = process->pop(); // get more work //::std::cerr << "csp_thread: new current fibre " << current << ::std::endl; } } // decrement running thread count process->running_thread_count--; // Async events can reload the active set, but they do NOT change current rewait: // if the async count > 0 we're waiting for the async op to complete // if the running thread count > 0 we're waiting for other threads to stall // //::std::cerr << "Scheduler out of fibres: async count = " << process->async_count.load() << ::std::endl; if(process->async_count.load() > 0 || process->running_thread_count.load() > 0) { // delay { ////::std::cerr << "Scheduler sleeping (inf)" << ::std::endl; ::std::unique_lock<::std::mutex> lk(process->async_lock); process->async_wake.wait_for(lk,::std::chrono::milliseconds(100000)); } // lock released now current = process->pop(); // get more work if(current) { process->running_thread_count++; goto retry; } goto rewait; } // //::std::cerr << "Scheduler out of work, returning" << ::std::endl; } void csp_thread_t::do_read(io_request_t *req) { req->chan->get()->channel->read(current->svc_req->io_request.pdata, ¤t); } void csp_thread_t::do_write(io_request_t *req) { req->chan->get()->channel->write(current->svc_req->io_request.pdata, ¤t); } void csp_thread_t::do_spawn_fibre(spawn_fibre_request_t *req) { // //::std::cerr << "do spawn" << ::std::endl; current->svc_req=nullptr; process->push(current); con_t *cc= req->tospawn; current = new(process->process_allocator) fibre_t(cc, process); cc->fibre = current; // //::std::cerr << "spawned " << current << ::std::endl; } void csp_thread_t::do_spawn_fibre_deferred(spawn_fibre_request_t *req) { // //::std::cerr << "do spawn deferred" << ::std::endl; current->svc_req=nullptr; con_t *init = req->tospawn; fibre_t *d = new(process->process_allocator) fibre_t(init, process); init->fibre = d; process->push(d); // //::std::cerr << "spawn deferred " << d << ::std::endl; } static void spawn(csp_process_t *pa, con_t *cc) { csp_thread_t(pa).sync_run(cc); } void csp_thread_t::do_spawn_process(spawn_process_request_t *req) { csp_process_t *process = new(process->system->system_allocator) csp_process_t(process->system, req->process_allocator); ::std::thread(spawn,process,req->tospawn).detach(); } void csp_thread_t::do_spawn_cothread(spawn_fibre_request_t *req) { current->process->refcnt++; ::std::thread(spawn,current->process,req->tospawn).detach(); } #endif
10 CSP: the lot
share/lib/rtl/rt/csp.hpp
#ifndef CSP #define CSP #include <typeinfo> // forward decls struct csp_clock_t; struct fibre_t; struct channel_t; struct csp_process_t; struct con_t; struct allocator_t; struct alloc_ref_t; struct system_t; struct channel_endpoint_t; struct chan_epref_t; // the csp system #include "allocator.hpp" #include "malloc_free.hpp" //#include "utility_allocators.hpp" //#include "freelist.hpp" #include "system_allocator.hpp" #include "system.hpp" #include "con.hpp" #include "fibre.hpp" #include "csp_process.hpp" #include "svc.hpp" con_t *coroutine_t::return_control() { //::std::cerr << "Coroutine " << this << " returns control" << ::std::endl; delete_csp_polymorphic_object(this,fibre->process->process_allocator); return nullptr; } con_t *subroutine_t::return_control() { auto tmp = caller; delete_csp_polymorphic_object(this,fibre->process->process_allocator); return tmp; } // resolve circular reference fibre_t::~fibre_t() { //::std::cerr << "Fibre destructor " << this << ::std::endl; while(cc) cc= cc->return_control(); } #include "channel.hpp" #include "csp_thread.hpp" #include "sequential_channel.hpp" #include "concurrent_channel.hpp" #include "async_channel.hpp" //#include "clock.hpp" #define CSP_SUBRETURN return return_control(); #define CSP_COSUICIDE return return_control(); #define CSP_CALLDEF_START \ con_t *call(con_t *caller_a #define CSP_CALLDEF_MID ){\ caller = caller_a;\ pc = 0; #define CSP_CALLDEF_END \ return this;\ } #define CSP_CODEF_START \ con_t *setup( #define CSP_CODEF_MID ){\ pc = 0; #define CSP_CODEF_END \ return this;\ } #define CSP_RESUME_START\ con_t *resume() override {\ switch(pc++){\ case 0: #define CSP_RESUME_END\ default: assert(false);\ }} #define CSP_SIZE \ size_t size() const override { return sizeof(*this); } #define SVC_READ_REQ(xpreq,xpchan,xpdata)\ (xpreq)->svc_code = read_request_code_e;\ (xpreq)->pdata = (void**)xpdata;\ (xpreq)->chan = xpchan; #define SVC_WRITE_REQ(xpreq,xpchan,xpdata)\ (xpreq)->svc_code = write_request_code_e;\ (xpreq)->pdata = (void**)xpdata;\ (xpreq)->chan = xpchan; #define SVC_ASYNC_WRITE_REQ(xpreq,xpchan,xpdata)\ (xpreq)->svc_code = async_write_request_code_e;\ (xpreq)->pdata = (void**)xpdata;\ (xpreq)->chan = xpchan; #define SVC_SPAWN_FIBRE_REQ(xpreq,xcont)\ (xpreq)->svc_code = spawn_fibre_request_code_e;\ (xpreq)->tospawn = xcont; #define SVC_SPAWN_FIBRE_DEFERRED_REQ(xpreq,xcont)\ (xpreq)->svc_code = spawn_fibre_deferred_request_code_e;\ (xpreq)->tospawn = xcont; #define SVC_SPAWN_PTHREAD_REQ(xprec,xcont)\ (xpreq)->spawn_pthread_request_code_e;\ (xpreq)->tospawn = xcont; #define SVC(preq)\ fibre->svc_req = (svc_req_t*)(void*)preq;\ return this; #define CSP_GOTO(caseno)\ pc = caseno;\ return this; /* #define CSP_CALL_DIRECT0(procedure)\ return (new procedure(global))->call(this); #define CSP_CALL_DIRECT1(procedure,arg)\ return (new procedure(global))->call(this,arg); #define CSP_CALL_DIRECT2(procedure,arg1,arg2)\ return (new procedure(global))->call(this,arg1,arg2); #define CSP_CALL_DIRECT3(procedure,arg1,arg2,arg3)\ return (new procedure(global))->call(this,arg1,arg2,arg3); **/ #endif
11 Simple test case 1
$PWD/csp_01.cxx
#include <iostream> #include <cassert> #include <list> #include "csp.hpp" #include "statistics_allocator.hpp" //#include "chips.hpp" // TEST CASE struct hello : coroutine_t { CSP_RESUME_START ::std::cerr << "Hello World" << ::std::endl; CSP_COSUICIDE CSP_RESUME_END CSP_SIZE }; struct producer : coroutine_t { ::std::list<int> *plst; ::std::list<int>::iterator it; chan_epref_t out; union { void *iodata; int value; }; io_request_t w_req; ~producer() { } CSP_CODEF_START ::std::list<int> *plst_a, chan_epref_t outchan_a CSP_CODEF_MID plst = plst_a; out = outchan_a; CSP_CODEF_END CSP_RESUME_START it = plst->begin(); SVC_WRITE_REQ(&w_req,&out,&iodata); case 1: if(it == plst->end()) { CSP_COSUICIDE } value = *it++; pc = 1; SVC(&w_req) CSP_RESUME_END CSP_SIZE }; struct consumer: coroutine_t { ::std::list<int> *plst; union { void *iodata; int value; }; io_request_t r_req; chan_epref_t inp; ~consumer() {} CSP_CODEF_START ::std::list<int> *plst_a, chan_epref_t inchan_a CSP_CODEF_MID plst = plst_a; inp = inchan_a; CSP_CODEF_END CSP_RESUME_START SVC_READ_REQ(&r_req,&inp,&iodata) case 1: SVC(&r_req) case 2: ::std::cerr << "Consumer gets " << value << ::std::endl; plst->push_back(value); CSP_GOTO(1) CSP_RESUME_END CSP_SIZE }; struct square : subroutine_t { int inp; int *pout; square(fibre_t *f) : subroutine_t(f) {} CSP_CALLDEF_START, int *pout_a, int inp_a CSP_CALLDEF_MID pout = pout_a; inp = inp_a; CSP_CALLDEF_END CSP_RESUME_START *pout = inp * inp; { con_t *tmp = caller; delete_concrete_object(this, fibre->process->process_allocator); return tmp; } CSP_RESUME_END CSP_SIZE }; struct transducer: coroutine_t { union { void *iodata; int value; }; io_request_t r_req; io_request_t w_req; chan_epref_t inp; chan_epref_t out; ~transducer() {} CSP_CODEF_START chan_epref_t inchan_a, chan_epref_t outchan_a CSP_CODEF_MID inp = inchan_a; out = outchan_a; CSP_CODEF_END CSP_RESUME_START SVC_READ_REQ(&r_req,&inp,&iodata) SVC_WRITE_REQ(&w_req,&out,&iodata) case 1: SVC(&r_req) case 2: //CSP_CALL_DIRECT2(square,&value,value) return (new(fibre->process->process_allocator) square(fibre))->call(this,&value,value); case 3: pc = 1; SVC(&w_req) CSP_RESUME_END CSP_SIZE }; struct init: coroutine_t { ::std::list<int> *inlst; ::std::list<int> *outlst; spawn_fibre_request_t spawn_req; chan_epref_t ch1out; chan_epref_t ch1inp; chan_epref_t ch2out; chan_epref_t ch2inp; chan_epref_t clock_connection; io_request_t clock_req; double waituntil; double *pwaituntil; ::std::shared_ptr<csp_clock_t> clock; ~init() {} // store parameters in CSP_CODEF_START ::std::list<int> *lin, ::std::list<int> *lout CSP_CODEF_MID inlst = lin; outlst = lout; CSP_CODEF_END CSP_RESUME_START ch1out = make_concurrent_channel(fibre->process->system->system_allocator); ch1inp = ch1out->dup(); ch2out = make_concurrent_channel(fibre->process->system->system_allocator); ch2inp = ch2out->dup(); SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) producer)->setup(inlst, ch1out)) SVC(&spawn_req) case 1: SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) transducer)->setup(ch1inp, ch2out)) SVC(&spawn_req) case 2: SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) consumer)->setup(outlst,ch2inp)) SVC(&spawn_req) case 3: SVC_SPAWN_FIBRE_DEFERRED_REQ(&spawn_req, (new(fibre->process->process_allocator) hello)) SVC(&spawn_req) case 4: /* { clock = make_clock(fibre->process->system); clock->start(); //::std::cerr << "Clock started, time is " << clock->now() << ::std::endl; clock_connection = clock->connect(); //::std::cerr << "Got connection" << ::std::endl; { double rightnow = clock->now(); waituntil = rightnow + 12.10; //::std::cerr << ::std::fixed << "Wait until" << waituntil << " for " << waituntil - rightnow << " seconds" << ::std::endl; } //::std::cerr << "Alarm time " << waituntil << ", stored at " << &waituntil // << "=" << pwaituntil << " which is stored at " << &pwaituntil << ::std::endl; pwaituntil = &waituntil; SVC_WRITE_REQ(&clock_req,&clock_connection,&pwaituntil); ::std::cerr<<"****** INIT Sleeping ********" << ::std::endl; } // too early to signal // need different kind of channel w. cv SVC(&clock_req) // too late to signal case 5: // if this doesn't print, we didn't resume after the sleep correctly **/ ::std::cerr<<"****** INIT Sleep Over ********" << ::std::endl; CSP_COSUICIDE CSP_RESUME_END CSP_SIZE }; // init class #include <iostream> int main() { // create the input list ::std::list<int> inlst; for (auto i = 0; i < 20; ++i) inlst.push_back(i); // empty output list ::std::list<int> outlst; { ::std::vector<mem_req_t> reqs; reqs.push_back(mem_req_t {sizeof(hello),50}); reqs.push_back(mem_req_t {sizeof(producer),50}); reqs.push_back(mem_req_t {sizeof(transducer),50}); reqs.push_back(mem_req_t {sizeof(consumer),50}); reqs.push_back(mem_req_t {sizeof(init),50}); reqs.push_back(mem_req_t {sizeof(square),50}); reqs.push_back(mem_req_t {256,50}); reqs.push_back(mem_req_t {512,50}); ::std::cout << "N reqs = " << reqs.size() << ::std::endl; auto fixed_reqs = fixup(reqs); ::std::cout << "fixed up N reqs = " << fixed_reqs.size() << ::std::endl; for(auto req : fixed_reqs) ::std::cout << req.block_size << " x " << req.n_blocks << ::std::endl; // bootstrap allocator alloc_ref_t malloc_free = malloc_free_allocator_t::create(); // parent C++ allocator /* //alloc_ref_t malloc_free_debugger = new(malloc_free) debugging_allocator_t(malloc_free, malloc_free, "Malloc"); // system allocator alloc_ref_t system_allocator_delegate = new(malloc_free_debugger) system_allocator_t(malloc_free_debugger,malloc_free_debugger, reqs); alloc_ref_t system_allocator_debugger = new(malloc_free_debugger) debugging_allocator_t(malloc_free_debugger, system_allocator_delegate, "Sys"); alloc_ref_t system_allocator = new(malloc_free_debugger) statistics_allocator_t( malloc_free_debugger, system_allocator_debugger, "Sysalloc.stats.txt"); // initial process will use the system allocator alloc_ref_t process_allocator = system_allocator; **/ alloc_ref_t x = system_allocator_t::create(malloc_free, fixed_reqs); alloc_ref_t system_allocator = statistics_allocator_t::create(malloc_free, x, "xstats.txt"); alloc_ref_t process_allocator = system_allocator; // creates the clock too system_t *system = new system_t(system_allocator); ::std::cout << "STARTING" << ::std::endl; csp_run(system, process_allocator, (new(process_allocator) init)->setup(&inlst, &outlst)); ::std::cerr << "RUN COMPLETE" << ::std::endl; delete system; } // the result is now in the outlist so print it ::std::cerr<< "main: +++++++++ List of squares:" << ::std::endl; for(auto v : outlst) ::std::cerr << v << ::std::endl; ::std::cerr<< "main: +++++++++ Done" << ::std::endl; }