ExpandCollapse

+ 1 Embedding

This technology is designed to allow Felix to be embedded in any C or C++ program or library.

The embedding library code is used by the core drivers.

+ 1.1 The flx_config class.

The flx_config class is used to store configuration data used by subsequent initialisation steps used to initiate a Felix world.

share/lib/rtl/flx_world_config.hpp

#ifndef __flx_world_config_H_
#define __flx_world_config_H_

#include "flx_rtl_config.hpp"
#include "flx_gc.hpp"
#include "flx_collector.hpp"
#include "flx_dynlink.hpp"

// for async_sched
#include <list>
#include "flx_async.hpp"
#include "flx_sync.hpp"

namespace flx { namespace run {
class RTL_EXTERN flx_config {
public:
  bool  debug;

  bool debug_threads;
  bool debug_allocations;
  bool debug_collections;
  bool report_collections;
  bool report_gcstats;

  bool debug_driver;
  bool finalise;

  size_t gc_freq;
  size_t min_mem;
  size_t max_mem;
  int gcthreads;

  double free_factor;

  bool allow_collection_anywhere;

  bool static_link;
  char *filename; // expected to live forever
  char **flx_argv;
  int flx_argc;

  // TODO: fn up in macro area
  int init(int argc, char **argv);

// interface for drivers. there's more, create_frame, etc
  create_async_hooker_t *ptr_create_async_hooker=nullptr;

  typedef ::flx::dynlink::flx_dynlink_t *(*link_library_t)(flx_config *c, ::flx::gc::generic::gc_profile_t*);
  typedef void (*init_ptr_create_async_hooker_t)(flx_config *, bool debug_driver);
  typedef int (*get_flx_args_config_t)(int argc, char **argv, flx_config* c);

  link_library_t link_library;
  init_ptr_create_async_hooker_t init_ptr_create_async_hooker;
  get_flx_args_config_t get_flx_args_config;

  flx_config (link_library_t, init_ptr_create_async_hooker_t, get_flx_args_config_t); 


};
}} // namespaces
#endif

share/src/rtl/flx_world_config.cpp

#include "flx_world_config.hpp"
#include <cstdlib>

static double egetv(char const *name, double dflt)
{
  char *env = ::std::getenv(name);
  double val = env?::std::atof(env):dflt;
  return val;
}

namespace flx { namespace run {

// =================================================================
// flx_config: Constructor
// =================================================================
flx_config::flx_config 
(
  link_library_t link_library_arg,
  init_ptr_create_async_hooker_t init_ptr_create_async_hooker_arg,
  get_flx_args_config_t get_flx_args_config_arg
) :
  link_library(link_library_arg),
  init_ptr_create_async_hooker(init_ptr_create_async_hooker_arg),
  get_flx_args_config(get_flx_args_config_arg)
{
  //fprintf(stderr,"flx_config constrfuctor\n");
}

// =================================================================
// flx_config: Initialiser
// =================================================================

int
flx_config::init(int argc, char **argv) {
  if(get_flx_args_config(argc, argv, this)) return 1;

  debug = (bool)egetv("FLX_DEBUG", debug);
  if (debug) {
    fprintf(stderr,
      "[FLX_DEBUG] Debug enabled for %s link program\n",
      static_link ? "static" : "dynamic");
  }

  debug_threads = (bool)egetv("FLX_DEBUG_THREADS", debug);
  if (debug_threads) {
    fprintf(stderr, "[FLX_DEBUG_THREADS] Threads debug enabled\n");
  }

  debug_allocations = (bool)egetv("FLX_DEBUG_ALLOCATIONS", debug);
  if (debug_allocations) {
    fprintf(stderr, "[FLX_DEBUG_ALLOCATIONS] Allocation debug enabled\n");
  }

  debug_collections = (bool)egetv("FLX_DEBUG_COLLECTIONS", debug);
  if (debug_collections)
  {
    fprintf(stderr, "[FLX_DEBUG_COLLECTIONS] Collection debug enabled\n");
  }

  report_collections = (bool)egetv("FLX_REPORT_COLLECTIONS", debug);
  if (report_collections)
  {
    fprintf(stderr, "[FLX_REPORT_COLLECTIONS] Collection report enabled\n");
  }

  report_gcstats = (bool)egetv("FLX_REPORT_GCSTATS", report_collections);
  if (report_collections)
  {
    fprintf(stderr, "[FLX_REPORT_GCSTATS] GC statistics report enabled\n");
  }


  debug_driver = (bool)egetv("FLX_DEBUG_DRIVER", debug);
  if (debug_driver)
  {
    fprintf(stderr, "[FLX_DEBUG_DRIVER] Driver debug enabled\n");
  }

  finalise = (bool)egetv("FLX_FINALISE", 0);
  if (debug)
    fprintf(stderr,
      "[FLX_FINALISE] Finalisation %s\n", finalise ? "Enabled" : "Disabled");

  // default collection frequency is 1000 interations
  gc_freq = (size_t)egetv("FLX_GC_FREQ", 1000);
  if (gc_freq < 1) gc_freq = 1;
  if (debug)
    fprintf(stderr, "[FLX_GC_FREQ] call gc every %zu iterations\n", gc_freq);

  // default min mem is 10 Meg
  min_mem = (size_t)(egetv("FLX_MIN_MEM", 10) * 1000000.0);
  if (debug)
    fprintf(stderr, "[FLX_MIN_MEM] call gc only if more than %zu Meg heap used\n", min_mem/1000000);

  // default max mem is unlimited
  max_mem = (size_t)(egetv("FLX_MAX_MEM", 0) * 1000000.0);
  if (max_mem == 0) max_mem = (size_t)-1;
  if (debug)
    fprintf(stderr, "[FLX_MAX_MEM] terminate if more than %zu Meg heap used\n", max_mem/1000000);

  // default free factor is 10%, this is also the minimum allowed
  free_factor = egetv("FLX_FREE_FACTOR", 1.1);
  if (free_factor < 1.1) free_factor = 1.1;
  if (debug)
    fprintf(stderr, "[FLX_FREE_FACTOR] reset gc trigger %4.2f times heap used after collection\n", free_factor);

  // experimental flag to allow collection anywhere
  // later, we default this one to true if we can
  // find all the thread stacks, which should be possible
  // with gcc and probably msvc++

  allow_collection_anywhere = (bool)egetv("FLX_ALLOW_COLLECTION_ANYWHERE", 1);
  if (debug)
    fprintf(stderr, "[FLX_ALLOW_COLLECTION_ANYWHERE] %s\n", allow_collection_anywhere ? "True" : "False");

  gcthreads = (int)egetv("FLX_GCTHREADS",0);
  if (debug)
    fprintf(stderr, "[FLX_GCTHREADS] %d\n",gcthreads);

  if (debug) {
    for (int i=0; i<flx_argc; ++i)
      fprintf(stderr, "flx_argv[%d]->%s\n", i, flx_argv[i]);
  }
  return 0;
}

}} // namespaces

+ 1.2 The flx_world class.

Objects of the flx_world class are used to represent a Felix world.

share/lib/rtl/flx_world.hpp

#ifndef __flx_world_H_
#define __flx_world_H_
#include "flx_rtl_config.hpp"

#include "flx_gc.hpp"
#include "flx_collector.hpp"
#include "flx_dynlink.hpp"

// for async_sched
#include <list>
#include "flx_async.hpp"
#include "flx_sync.hpp"
#include "flx_world_config.hpp"
#include "flx_async_world.hpp"

namespace flx { namespace run {

class RTL_EXTERN flx_world {
  bool debug;
  bool debug_driver;

  ::flx::gc::generic::allocator_t *allocator;

  ::flx::gc::collector::flx_collector_t *collector;

  ::flx::gc::generic::gc_profile_t *gcp;

  ::flx::dynlink::flx_dynlink_t *library;
  ::flx::dynlink::flx_libinst_t *instance;

  struct async_sched *async_scheduler;

  int explicit_dtor();
public:
  flx_config *c;
  flx_world(flx_config *); 
  int setup(int argc, char **argv);

  int teardown();

  // add/remove (current pthread, stack pointer) for garbage collection
  void begin_flx_code();
  void end_flx_code();

  // returns number of pending operations scheduled by svc_general
  // return error code < 0 otherwise
  // catches all known exceptions
  int run();

  void* ptf()const { return instance->thread_frame; }	// for creating con_t

  async_hooker *create_demux();

  void spawn_fthread(::flx::rtl::con_t *top);

  void external_multi_swrite (::flx::rtl::schannel_t *chan, void *data);

  async_sched *get_async_scheduler()const { return async_scheduler; }
  sync_sched *get_sync_scheduler()const { return async_scheduler->ss; }
};

struct base_thread_frame_t {
  int argc;
  char **argv;
  FILE *flx_stdin;
  FILE *flx_stdout;
  FILE *flx_stderr;
  ::flx::gc::generic::gc_profile_t *gcp;
  flx_world *world;
};


}} // namespaces
#endif //__flx_world_H_

share/src/rtl/flx_world.cpp

#include "flx_world.hpp"
#include "flx_eh.hpp"
#include "flx_ts_collector.hpp"
#include "flx_rtl.hpp"

using namespace ::std;
using namespace ::flx::rtl;
using namespace ::flx::pthread;
using namespace ::flx::run;

namespace flx { namespace run {

// =================================================================
// flx_world : final cleanup
// =================================================================

// terminates process!
// Not called by default (let the OS clean up)

static int do_final_cleanup(
  bool debug_driver,
  flx::gc::generic::gc_profile_t *gcp,
  ::flx::dynlink::flx_dynlink_t *library,
  ::flx::dynlink::flx_libinst_t *instance
)
{
  flx::gc::generic::collector_t *collector = gcp->collector;

  // garbage collect application objects
  {
    if (debug_driver || gcp->debug_collections)
      fprintf(stderr, "[do_final_cleanup] Finalisation: pass 1 Data collection starts ..\n");

    size_t n = collector->collect();
    size_t a = collector->get_allocation_count();

    if (debug_driver || gcp->debug_collections)
      fprintf(stderr, "[do_final_cleanup] flx_run collected %zu objects, %zu left\n", n, a);
  }

  // garbage collect system objects
  {
    if (debug_driver || gcp->debug_collections)
      fprintf(stderr, "[do_final_cleanup] Finalisation: pass 2 Final collection starts ..\n");

    collector->free_all_mem();
    size_t a = collector->get_allocation_count();

    if (debug_driver || gcp->debug_collections)
      fprintf(stderr, "[do_final_cleanup] Remaining %zu objects (should be 0)\n", a);

    if (a != 0){
      fprintf(stderr, "[do_final_cleanup] flx_run %zu uncollected objects, should be zero!! return code 5\n", a);
      return 5;
    }
  }

  if (debug_driver)
    fprintf(stderr, "[do_final_cleanup] exit 0\n");

  return 0;
}

static void *get_stack_pointer() { void *x=(void*)&x; return x; }

// =================================================================
// flx_world : run mainline pthread constructor
// =================================================================
// RUN A FELIX INSTANCE IN THE CURRENT PTHREAD
//
// CURRENTLY ONLY CALLED ONCE IN MAIN THREAD
// RETURNS A LIST OF FTHREADS
// 

static fthread_list*
run_felix_pthread_ctor(
  flx::gc::generic::gc_profile_t *gcp,
  ::flx::dynlink::flx_libinst_t *instance)
{
  //fprintf(stderr, "run_felix_pthread_ctor -- the MAIN THREAD: library instance: %p\n", instance);
  flx::gc::generic::collector_t *collector = gcp->collector;
  fthread_list *active = new(*gcp, ::flx::run::fthread_list_ptr_map,false)  fthread_list(gcp);

  {
    con_t *top = instance->main_proc;
    //fprintf(stderr, "  ** MAIN THREAD: flx_main entry point : %p\n", top);
    if (top)
    {
      fthread_t *flx_main = new (*gcp, _fthread_ptr_map, false) fthread_t(top, active);
      active->push_front(flx_main);
    }
  }

  {
    con_t *top = instance->start_proc;
    //fprintf(stderr, "  ** MAIN THREAD: flx_start (initialisation) entry point : %p\n", top);
    if (top)
    {
      fthread_t *ft = new (*gcp, _fthread_ptr_map, false) fthread_t(top, active);
      active->push_front(ft);
    }
  }
  return active;
}

// =================================================================
// flx_world : run mainline pthread destructor
// Joins all threads to main thread
// Detaches Felix instance by unrooting it
// Collects all garbage if finalisation set
// =================================================================
static void run_felix_pthread_dtor(
  bool debug_driver,
  flx::gc::generic::gc_profile_t *gcp,
  ::flx::dynlink::flx_dynlink_t *library,
  ::flx::dynlink::flx_libinst_t *instance
)
{
  if (debug_driver)
    fprintf(stderr, "[run_felix_pthread_dtor] MAIN THREAD FINISHED: waiting for other threads\n");

  gcp->collector->get_thread_control()->join_all();

  if (debug_driver) 
    fprintf(stderr, "[run_felix_pthread_dtor] ALL THREADS DEAD: mainline cleanup!\n");

  if (debug_driver) {
    flx::gc::generic::collector_t *collector = gcp->collector;

    size_t uncollected = collector->get_allocation_count();
    size_t roots = collector->get_root_count();
    fprintf(stderr,
      "[run_felix_pthread_dtor] program finished, %zu collections, %zu uncollected objects, roots %zu\n",
      gcp->collections, uncollected, roots);
  }
  gcp->collector->remove_root(instance);

  if (gcp->finalise)
    (void)do_final_cleanup(debug_driver, gcp, library, instance);

  if (debug_driver) 
    fprintf(stderr, "[run_felix_pthread_dtor] mainline cleanup complete, exit\n");
   
}

// =================================================================
// flx_world : Constructor
// attach flx_config and exit
// =================================================================
flx_world::flx_world(flx_config *c_arg) : c(c_arg) {}

// =================================================================
// flx_world : setup
// 
// This is the actual world constructor. It is separated from
// the C++ constructor to allow a pointer to the world to be stashed
// by the user somewhere, prior to actually initialising Felix
//
// =================================================================
//
// attach flx_config and exit
// calls init passing command line arguments
// exits if init returns non-zero
// otherwise, sets up flx_world data based on flx_config attached by constructor
// creates allocator
// creates thread control object 
// creates garbage collector
// creates collector profile object
// links to the main library control object and makes it a root
// adds current thread to thread control object 
// creates instance of library and makes it a root:
//   *  loads the dll (if using dynamic loading)
//   *  finds the thread frame constructor in the library
//   *  invokes the thread frame constructor to create the thread frame
//   *  initialises the common part of the thread frame (with command line args etc)
// removes thread from thread control object
// sets up synchronous scheduler (by calling run_felix_pthread_ctor):
//   * finds primary entry point
//   * creates initial continuation (in a suspended state)
//   * create initial fibre from that continuation
//   * creates sync scheduler list and initialises it with initial fibre 
// creates async scheduler
// NOTE: does not run scheduler(s)
// =================================================================

int flx_world::setup(int argc, char **argv) {
  int res;
  if((res = c->init(argc, argv) != 0)) return res;

  debug = c->debug;
  debug_driver = c->debug_driver;

  if(debug_driver)
    fprintf(stderr, "[flx_world %p: setup]\n", this);

  allocator = new flx::gc::collector::malloc_free();
  if(debug_driver)
    fprintf(stderr, "[flx_world: setup] Created allocator %p\n", allocator);
  allocator->set_debug(c->debug_allocations);

  char *tracecmd = getenv("FLX_TRACE_ALLOCATIONS");
  if(tracecmd && strlen(tracecmd)>0) {
     FILE *f = fopen(tracecmd,"w");
     if(f) {
       fprintf(stderr, "Allocation tracing active, file = %s\n",tracecmd);
       allocator = new flx::gc::collector::tracing_allocator(f,allocator);
     }
     else 
       fprintf(stderr, "Unable to open allocation trace file %s for output (ignored)\n",tracecmd);
  }

  // previous direct ctor scope ended at closing brace of FLX_MAIN
  // but delete can probably be moved up after collector delete (also used by explicit_dtor)
  ::flx::pthread::thread_control_t *thread_control = new ::flx::pthread::thread_control_t(c->debug_threads);
  if(debug_driver)
    fprintf(stderr, "[flx_world: setup] Created thread control object  %p\n", thread_control);

  // NB: !FLX_SUPPORT_ASYNC refers to async IO, hence ts still needed thanks to flx pthreads
  FILE *tracefile = NULL;
  {
    char *tracecmd = getenv("FLX_TRACE_GC");
    if(tracecmd && strlen(tracecmd)>0) {
      tracefile = fopen(tracecmd,"w");
      if(tracefile) 
        fprintf(stderr, "GC tracing active, file = %s\n",tracecmd);
    }
  }

  // Create Garbage Collector
  collector = new flx::gc::collector::flx_ts_collector_t(
    allocator, 
    thread_control, 
    c->gcthreads, tracefile
  );
  collector->set_debug(c->debug_collections, c->report_gcstats);
  if(debug_driver)
    fprintf(stderr, "[flx_world: setup] Created ts collector %p\n", collector);

  // Create Collector Profile
  gcp = new flx::gc::generic::gc_profile_t(
    c->debug_driver,
    c->debug_allocations,
    c->debug_collections,
    c->report_collections,
    c->report_gcstats,
    c->allow_collection_anywhere,
    c->gc_freq,
    c->min_mem,
    c->max_mem,
    c->free_factor,
    c->finalise,
    collector
  );

  if(debug_driver)
    fprintf(stderr, "[flx_world: setup] Created gc profile object gcp=%p\n",gcp);

  library = c->link_library(c,gcp);
  collector->add_root (library);

  if(debug_driver)
    fprintf(stderr, "[flx_world: setup] Created library object %p\n", library);

  if (debug_driver)
  {
    fprintf(stderr, "[flx_world:setup] flx_run driver begins argv[0]=%s\n", c->flx_argv[0]);
    for (int i=1; i<argc-1; ++i)
      fprintf(stderr, "[flx_world:setup]                       argv[%d]=%s\n", i,c->flx_argv[i]);
  }

  // flx_libinst_t::create can run code, so add thread to avoid world_stop abort
  thread_control->add_thread(get_stack_pointer(), ::flx::pthread::mainline);

  // Create the usercode driver instance
  // NB: seems to destroy()ed in do_final_cleanup
  instance = new (*gcp, ::flx::dynlink::flx_libinst_ptr_map, false) ::flx::dynlink::flx_libinst_t(debug_driver);
  collector->add_root(instance);
  instance->create(
    library,
    gcp,
    this,
    c->flx_argc,
    c->flx_argv,
    stdin,
    stdout,
    stderr,
    debug_driver);

  thread_control->remove_thread();

  if (debug_driver) {
    fprintf(stderr, "[flx_world:setup] loaded library %s at %p\n", c->filename, library->library);
    fprintf(stderr, "[flx_world:setup] thread frame at %p\n", instance->thread_frame);
    fprintf(stderr, "[flx_world:setup] initial continuation at %p\n", instance->start_proc);
    fprintf(stderr, "[flx_world:setup] main continuation at %p\n", instance->main_proc);
    fprintf(stderr, "[flx_world:setup] creating async scheduler\n");
  }

  // FIXME: this doesn't belong in this subroutine
  // The above stuff sets universal variables
  // the below stuff sets variables that ONLY apply
  // to the mainline thread
  auto schedlist = run_felix_pthread_ctor(gcp, instance);

  async_scheduler = new (*gcp,async_sched_ptr_map,false) async_sched(
    this,
    debug_driver,
    gcp, schedlist
    ); // deletes active for us!

  return 0;
}

// =================================================================
// flx_world : Explicit Destructor
// Destroys current Felix world.
// However the Allocator and GC are retained.

// Calls run_felix_pthread_dtor:
//   *  Joins all threads to main thread
//   *  Detaches Felix instance by unrooting it
//   *  Collects all garbage if finalisation set
//
// Finalises system, running GC if required (AGAIN!)
// =================================================================
int flx_world::explicit_dtor()
{
  if (debug_driver)
    fprintf(stderr, "[explicit_dtor] entry\n");

  run_felix_pthread_dtor(debug_driver, gcp, library, instance);

  if (gcp->finalise)
  {
    if (debug_driver)
      fprintf(stderr, "[explicit_dtor] flx_run driver ends with finalisation complete\n");
  }
  else
  {
    if (debug_driver || gcp->debug_collections)
    {
      size_t a = gcp->collector->get_allocation_count();
      fprintf(stderr,
        "[explicit_dtor] flx_run driver ends with finalisation skipped, %zu uncollected "
          "objects\n", a);
    }
  }

  if (debug_driver)
    fprintf(stderr, "[explicit_dtor] exit 0\n");

  return 0;
}

// =================================================================
// flx_world : Teardown
//
// IRREVERSIBLY DESTROYS THE WORLD. Kills the allocator, collector,
// collector profile and thread control object.
//
// This routine is a proxy for the destructor, it should be called
// explicitly before deleting the world object.
// it returns 0 if the explicit destructor succeeded, non-zero otherwise.
// =================================================================
int flx_world::teardown() {
  if (debug_driver)
    fprintf(stderr, "[teardown] entry\n");

  thread_control_base_t *thread_control = collector->get_thread_control();
  thread_control->add_thread(get_stack_pointer(), ::flx::pthread::mainline);

  // could this override error_exit_code if something throws?
  int error_exit_code = explicit_dtor();
  if (debug_driver)
    fprintf(stderr,"[teardown] explicit dtor run code %d\n", error_exit_code);

  instance=0;
  library=0;
  if (debug_driver)
    fprintf(stderr,"[teardown] library & instance NULLED\n");

  // And we're done, so start cleaning up.
  delete gcp;

  delete collector;
  if (debug_driver) 
    fprintf(stderr,"[teardown] collector deleted\n");

  delete allocator;
  if (debug_driver) 
    fprintf(stderr,"[teardown] allocator deleted\n");

  if (debug_driver) 
    fprintf(stderr, "[teardown] flx_run driver ends code=%d\n", error_exit_code);

  delete thread_control;  // RF: cautiously delete here
  if (debug_driver) 
    fprintf(stderr,"[teardown] thread control deleted\n");
  return error_exit_code;
}


// =================================================================
// Suspension control.
//
// In a user event loop, when Felix main thread returns,
// the thread must be removed from the thread control object.
// Prior to resuming the suspended system, the current thread
// must be registered with the thread control object again.
//
// The reason is that there may be other threads running, even
// though the main thread has exited to allow the user event
// loop to check for and service events.
//
// Should one of these threads trigger a garbage collection,
// the thread control object is used to signal all the other threads 
// that they muist stop, and then waits until they do so.
//
// Waiting for the main thread would cause a lockup, since it is 
// running the user event loop, not Felix code, and therefore 
// cannot yield. In fact it has already yielded to the user
// event loop .. so these two routines are used to ensure that
// the thread is only registered when it can actually respond
// to a world stop request.
//
// =================================================================
// flx_world : Resume Felix
// =================================================================
void flx_world::begin_flx_code() {
  collector->get_thread_control() -> add_thread(get_stack_pointer(), ::flx::pthread::mainline);
}

// =================================================================
// flx_world :  Suspend Felix
// =================================================================
void flx_world::end_flx_code() {
  collector->get_thread_control()->remove_thread();
}


// =================================================================
// flx_world :  Run Felix Mainline
// =================================================================
int flx_world::run() {
  // this may not be called on the same thread, so let thread control know
  // when we exit, main thread is not running so pthreads can garbage collect without waiting for us

  try {
    return async_scheduler->prun();
  }
  catch (flx_exception_t &x) { return - flx_exception_handler (&x); }
  catch (std::exception &x) { return - std_exception_handler (&x); }
  catch (int &x) { fprintf (stderr, "Exception type int: %d\n", x); return -x; }
  catch (::std::string &x) { fprintf (stderr, "Exception type string : %s\n", x.c_str()); return -1; }
  catch (::flx::rtl::con_t &x) { fprintf (stderr, "Rogue continuatiuon caught\n"); return -6; }
  catch (...) { fprintf(stderr, "[flx_world:run_until_complete] Unknown exception in thread!\n"); return -5; }
}


// =================================================================
// flx_world :  Spawn fibre hook
// =================================================================
// TODO: factor into async_sched. run_felix_pthread_ctor does this twice
void flx_world::spawn_fthread(con_t *top) {
  auto ss = get_sync_scheduler();
	fthread_t *ft = new (*gcp, _fthread_ptr_map, false) fthread_t(top, ss->active);
  ss->push_front(ft);
}

// =================================================================
// flx_world :  External multiwrite hook
// =================================================================
void flx_world::external_multi_swrite (schannel_t *chan, void *data) 
{
  async_scheduler->external_multi_swrite (chan,data);
} 

// =================================================================
// flx_world :  Create Demux event polling system and thread
// =================================================================
async_hooker *flx_world::create_demux() 
{
  if(debug_driver)
    fprintf(stderr,"[create_demux]: svc_general] trying to create async system..\n");

  if (c->ptr_create_async_hooker == NULL) {
    if(debug_driver)
      fprintf(stderr,"[create_demux: svc_general] trying to create async hooker..\n");
    c->init_ptr_create_async_hooker(c,debug_driver);
  }
  // Error out if we don't have the hooker function.
  if (c->ptr_create_async_hooker == NULL) {
    fprintf(stderr,
      "[create_demux: svc_general] Unable to initialise async I/O system: terminating\n");
    exit(1);
  }

  // CREATE A NEW ASYNCHRONOUS EVENT MANAGER
  // DONE ON DEMAND ONLY
  auto demux_hook = (*c->ptr_create_async_hooker)(
    gcp->collector->get_thread_control(), // thread_control object
    20000, // bound on resumable thread queue
    50,    // bound on general input job queue
    2,     // number of threads in job pool
    50,    // bound on async fileio job queue
    1      // number of threads doing async fileio
  );
  return demux_hook;
}


}} // namespaces