ExpandCollapse

+ 1 Thread Control

share/src/pthread/pthread_thread_control.cpp

#include "pthread_thread.hpp"
#include <stdio.h>
#include <cstdlib>
#include <cassert>

#define FLX_SAVE_REGS \
  jmp_buf reg_save_on_stack; \
  setjmp (reg_save_on_stack)


namespace flx { namespace pthread {

world_stop_notifier_t::~world_stop_notifier_t(){}

static void *get_stack_pointer() { 
  void *x; 
  void *y = (void*)&x;
  return y; 
}

// SHOULD BE MUTEX PROTECETD
void thread_control_t::register_world_stop_notifier(world_stop_notifier_t *p)
{
//fprintf(stderr,"World stop notifier registered: %p\n", p);
  ::std::unique_lock<::std::mutex> lck(notification_registry_mutex);
  for (size_t i=0; i<world_stop_notifier_array_length; ++i)
    if(p == world_stop_notifier_array[i]) return;
  world_stop_notifier_array = (world_stop_notifier_t**)realloc(world_stop_notifier_array, 
    sizeof(world_stop_notifier_t*) * (world_stop_notifier_array_length + 1));
  world_stop_notifier_array[world_stop_notifier_array_length] = p;
  ++world_stop_notifier_array_length;
}

// SHOULD BE MUTEX PROTECETD
void thread_control_t::unregister_world_stop_notifier(world_stop_notifier_t *p)
{
  ::std::unique_lock<::std::mutex> lck(notification_registry_mutex);
  size_t i = 0;
  for (i=0; i<world_stop_notifier_array_length; ++i)
    if(p == world_stop_notifier_array[i]) break;
  if (i == world_stop_notifier_array_length) return;
  for (size_t j =  i + 1; j < world_stop_notifier_array_length; ++j)
    world_stop_notifier_array[j-1] = world_stop_notifier_array[j];
  --world_stop_notifier_array_length;
  world_stop_notifier_array = (world_stop_notifier_t**)realloc(world_stop_notifier_array,
    sizeof(world_stop_notifier_t*) * (world_stop_notifier_array_length));
}

void thread_control_t::world_stop_notify()
{
if (world_stop_notifier_array_length > 0)
  //fprintf(stderr, "thread_control_t::world_stop_notify() notifying %zu objects\n",
  //  world_stop_notifier_array_length);
  for (size_t i=0; i<world_stop_notifier_array_length; ++i)
    world_stop_notifier_array[i]->notify_world_stop();
}

bool thread_control_t::get_debug()const { return debug; }

thread_control_base_t::~thread_control_base_t(){}

thread_control_t::thread_control_t (bool d) :
  do_world_stop(false), thread_counter(0), active_counter(0), realtime_thread_counter(0), debug(d),
  world_stop_notifier_array(0), world_stop_notifier_array_length(0)
  {
    if(debug)
      fprintf(stderr,"INITIALISING THREAD CONTROL OBJECT\n");
  }

size_t thread_control_t::thread_count()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    return thread_counter;
  }

size_t thread_control_t::active_count()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    return active_counter;
  }

void thread_control_t::set_realtime() 
  {
     ++realtime_thread_counter;
  }

thread_data_t *thread_control_t::get_thread_data_pointer()const {
  auto p = threads.find (mythrid());
  assert (p != threads.end());
  return p->second.get();
}

void thread_control_t::add_thread(void *stack_base, thread_kind_t k)
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    uintptr_t id = mythrid();
    thread_data_t *ptd = new thread_data_t(stack_base,k);
    threads.insert (std::make_pair(id,::std::unique_ptr<thread_data_t>(ptd)));
    ++thread_counter;
    ++active_counter;
    if(debug)
      fprintf(stderr, "Adding thread %p base %p, count=%zu\n", (void*)(uintptr_t)id, stack_base, thread_counter);
    stop_guard.notify_all();
  }

void thread_control_t::remove_thread()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    uintptr_t id = mythrid();
    if (threads.erase(id) == 0)
    {
      fprintf(stderr, "Remove thread %p which is not registered\n", (void*)(uintptr_t)id);
      std::abort();
    }
    --thread_counter;
    --active_counter;
    if(debug)
      fprintf(stderr, "Removed thread %p, count=%zu\n", (void*)(uintptr_t)id, thread_counter);
    stop_guard.notify_all();
  }

// stop the world!

// NOTE: ON EXIT, THE MUTEX REMAINS LOCKED

bool thread_control_t::world_stop()
  {
    stop_mutex.lock();
    if(debug)
      fprintf(stderr,"Thread %p Stopping world, active threads=%zu\n", (void*)mythrid(), active_counter);
    if (do_world_stop) {
      stop_mutex.unlock();
      return false; // race! Someone else beat us
    }
    do_world_stop = true;

    // this calls the notify_world_stop() method of all the
    // objects such as condition variables that are registered
    // in the notification list. That method is expected to do a notify_all()
    // on the condition variable.

    world_stop_notify();

    // this is for the thread control objects own condition variable
    // which is used to count the number of threads that have suspended

    stop_guard.notify_all();

    while(active_counter - realtime_thread_counter > 1) {
      if(debug)
        for(
          thread_registry_t::iterator it = threads.begin();
          it != threads.end();
          ++it
        )
        {
          fprintf(stderr, "Thread = %p is %s\n",(void*)(uintptr_t)(*it).first, (*it).second->active? "ACTIVE": "SUSPENDED");
        }
      if(debug)
        fprintf(stderr,"Thread %p Stopping world: begin wait, threads=%zu\n",  (void*)mythrid(), thread_counter);
      stop_guard.wait(stop_mutex);
      if(debug)
        fprintf(stderr,"Thread %p Stopping world: checking threads=%zu\n", (void*)mythrid(), thread_counter);
    }
    // this code has to be copied here, we cannot use 'yield' because
    // it would deadlock ourself
    {
      uintptr_t id = mythrid();
      FLX_SAVE_REGS;
      void *stack_pointer = get_stack_pointer();
      if(debug)
        fprintf(stderr,"World stop thread=%p, stack=%p!\n",(void*)(uintptr_t)id, stack_pointer);
      thread_registry_t::iterator it = threads.find(id);
      if(it == threads.end()) {
        fprintf(stderr,"MAIN THREAD: Cannot find thread %p in registry\n",(void*)(uintptr_t)id);
        abort();
      }
      (*it).second->stack_top = stack_pointer;
      if(debug)
        fprintf(stderr,"Stack size = %zu\n",(size_t)((char*)(*it).second->stack_base -(char*)(*it).second->stack_top));
    }
    if(debug)
      fprintf(stderr,"World STOPPED\n");
    return true; // we stopped the world
  }

// used by mainline to wait for other threads to die
void thread_control_t::join_all()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    if(debug)
      fprintf(stderr,"Thread %p Joining all\n", (void*)mythrid());
    while(do_world_stop || thread_counter>1) {
      unsafe_stop_check();
      stop_guard.wait(stop_mutex);
    }
    if(debug)
      fprintf(stderr,"World restarted: do_world_stop=%d, Yield thread count now %zu\n",do_world_stop,thread_counter);
  }

// restart the world
void thread_control_t::world_start()
  {
    if(debug)
      fprintf(stderr,"Thread %p Restarting world\n", (void*)mythrid());
    do_world_stop = false;
    stop_mutex.unlock();
    stop_guard.notify_all();
  }

memory_ranges_t *thread_control_t::get_block_list()
{
  memory_ranges_t *v = new std::vector<memory_range_t>;
  thread_registry_t::iterator end = threads.end();
  for(thread_registry_t::iterator i = threads.begin();
    i != end;
    ++i
  )
  {
    thread_data_t const *ptd = (*i).second.get();
    // !(base < top) means top <= base, i.e. stack grows downwards
    assert(!std::less<void*>()(ptd->stack_base,ptd->stack_top));
    // from top upto base..
    v->push_back(memory_range_t(ptd->stack_top, ptd->stack_base));
  }
  return v;
}

void thread_control_t::suspend()
{
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[suspend: thread= %p]\n", (void*)mythrid());
  unsafe_suspend();
}

void thread_control_t::resume()
{
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[resume: thread= %p]\n", (void*)mythrid());
  unsafe_resume();
}


void thread_control_t::unsafe_suspend()
{
  void *stack_pointer = get_stack_pointer();
  uintptr_t id = mythrid();
  if(debug)
    fprintf(stderr,"[unsafe_suspend:thread=%p], stack=%p!\n",(void*)(uintptr_t)id, stack_pointer);
  thread_registry_t::iterator it = threads.find(id);
  if(it == threads.end()) {
    if(debug)
      fprintf(stderr,"[unsafe_suspend] Cannot find thread %p in registry\n",(void*)(uintptr_t)id);
      abort();
  }
  (*it).second->stack_top = stack_pointer;
  (*it).second->active = false;
  if(debug) // VC++ is bugged, doesn't support %td format correctly?
    fprintf(stderr,"[unsafe_suspend: thread=%p] stack base %p > stack top %p, Stack size = %zd\n",
      (void*)(uintptr_t)id,
      (char*)(*it).second->stack_base,
      (char*)(*it).second->stack_top, 
      (size_t)((char*)(*it).second->stack_base -(char*)(*it).second->stack_top));
  --active_counter;
  if(debug)
    fprintf(stderr,"[unsafe_suspend]: active thread count now %zu\n",active_counter);
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_suspend]: stop_guard.notify_all() done");
}

void thread_control_t::unsafe_resume()
{
  if(debug)
    fprintf(stderr,"[unsafe_resume: thread %p]\n", (void*)mythrid());
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.notify_all() done");
  while(do_world_stop) stop_guard.wait(stop_mutex);
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.wait() done");
  ++active_counter;
  uintptr_t id = mythrid();
  thread_registry_t::iterator it = threads.find(id);
  if(it == threads.end()) {
    if(debug)
      fprintf(stderr,"[unsafe_resume: thread=%p] Cannot find thread in registry\n",(void*)(uintptr_t)id);
      abort();
  }
  (*it).second->active = true;
  if(debug) {
    fprintf(stderr,"[unsafe_resume: thread=%p] resumed, active count= %zu\n",
      (void*)mythrid(),active_counter);
  }
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.notify_all() done");
}

// mutex already held
void thread_control_t::unsafe_stop_check()
{
//fprintf(stderr, "Unsafe stop check ..\n");
  if (do_world_stop)
  {

    if(debug)
      fprintf(stderr,"[unsafe_stop_check: thread=%p] world_stop detected\n", 
        (void*)mythrid());
    FLX_SAVE_REGS;
    unsafe_suspend();
    unsafe_resume();
  }
//fprintf(stderr, "Unsafe stop check finishes\n");
}

void thread_control_t::yield()
{
//fprintf(stderr,"Thread control yield starts\n");
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[Thread_control:yield: thread=%p]\n", (void*)mythrid());
//fprintf(stderr,"Unsafe stop check starts\n");
  unsafe_stop_check();
//fprintf(stderr,"Unsafe stop check done\n");
}

}}

+ 2 Thread Safe Collector.

The thread safe collector class flx_ts_collector_t is derived from the flx_collector_t class. It basically dispatches to its base with locks as required.

share/lib/rtl/flx_ts_collector.hpp

#ifndef __FLX_TS_COLLECTOR_H__
#define __FLX_TS_COLLECTOR_H__
#include "flx_collector.hpp"
#include "pthread_thread.hpp"
#include <thread>
#include <mutex>

namespace flx {
namespace gc {
namespace collector {

/// Naive thread safe Mark and Sweep Collector.
struct PTHREAD_EXTERN flx_ts_collector_t :
  public flx::gc::collector::flx_collector_t
{
  flx_ts_collector_t(allocator_t *, flx::pthread::thread_control_t *, int _gcthreads, FILE*);
  ~flx_ts_collector_t();

private:
  /// allocator
  void *v_allocate(gc_shape_t *ptr_map, size_t);

  /// collector (returns number of objects collected)
  size_t v_collect();

  // add and remove roots
  void v_add_root(void *memory);
  void v_remove_root(void *memory);

  // statistics
  size_t v_get_allocation_count()const;
  size_t v_get_root_count()const;
  size_t v_get_allocation_amt()const;

private:
  mutable ::std::mutex mut;
};


}}} // end namespaces

#endif

share/src/pthread/flx_ts_collector.cpp

#include "flx_rtl_config.hpp"
#include "flx_ts_collector.hpp"

namespace flx {
namespace gc {
namespace collector {

flx_ts_collector_t::flx_ts_collector_t(allocator_t *a, flx::pthread::thread_control_t *tc,int _gcthreads, FILE *tf) :
  flx_collector_t(a,tc,_gcthreads,tf)
{}

flx_ts_collector_t::~flx_ts_collector_t(){}

void *flx_ts_collector_t::v_allocate(gc_shape_t *ptr_map, size_t x) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_allocate(ptr_map,x);
}

size_t flx_ts_collector_t::v_collect() {
  // NO MUTEX
  //if(debug)
  //  fprintf(stderr,"[gc] Request to collect, thread_control = %p, thread %p\n", thread_control, (size_t)flx::pthread::get_current_native_thread());
  return impl_collect();
}

void flx_ts_collector_t::v_add_root(void *memory) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  impl_add_root(memory);
}

void flx_ts_collector_t::v_remove_root(void *memory) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  impl_remove_root(memory);
}

size_t flx_ts_collector_t::v_get_allocation_count()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_allocation_count();
}

size_t flx_ts_collector_t::v_get_root_count()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_root_count();
}

size_t flx_ts_collector_t::v_get_allocation_amt()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_allocation_amt();
}


}}} // end namespaces


+ 3 Build System

$PWD/buildsystem/flx_pthread.py

import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record
from fbuild.builders.file import copy

import buildsystem
from buildsystem.config import config_call

# ------------------------------------------------------------------------------

def build_runtime(phase):
    print('[fbuild] [rtl] build pthread')
    path = Path(phase.ctx.buildroot/'share'/'src/pthread')

    srcs = Path.glob(path / '*.cpp')
    includes = [
      phase.ctx.buildroot / 'host/lib/rtl', 
      phase.ctx.buildroot / 'share/lib/rtl']
    macros = ['BUILD_PTHREAD']
    flags = []
    libs = [
        call('buildsystem.flx_gc.build_runtime', phase),
    ]
    external_libs = []

    pthread_h = config_call('fbuild.config.c.posix.pthread_h',
        phase.platform,
        phase.cxx.shared)

    dst = 'host/lib/rtl/flx_pthread'
    if pthread_h.pthread_create:
        flags.extend(pthread_h.flags)
        libs.extend(pthread_h.libs)
        external_libs.extend(pthread_h.external_libs)

    return Record(
        static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            cflags=flags,
            libs=[lib.static for lib in libs],
            external_libs=external_libs,
            lflags=flags),
        shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            cflags=flags,
            libs=[lib.shared for lib in libs],
            external_libs=external_libs,
            lflags=flags))

+ 4 Configuration Database

$PWD/src/config/unix/flx_pthread.fpc

Name: Flx_pthread
Description: Felix Pre-emptive threading support

provides_dlib: -lflx_pthread_dynamic
provides_slib: -lflx_pthread_static
includes: '"pthread_thread.hpp"'
Requires: flx_gc flx_exceptions pthread
library: flx_pthread
macros: BUILD_PTHREAD
srcdir: src/pthread
src: .*\.cpp

$PWD/src/config/win/flx_pthread.fpc

Name: Flx_pthread
Description: Felix Pre-emptive threading support

provides_dlib: /DEFAULTLIB:flx_pthread_dynamic
provides_slib: /DEFAULTLIB:flx_pthread_static
includes: '"pthread_thread.hpp"'
Requires: flx_gc flx_exceptions pthread
library: flx_pthread
macros: BUILD_PTHREAD
srcdir: src/pthread
src: .*\.cpp

$PWD/src/config/pthread.fpc

Description: pthread support defaults to no requirements

$PWD/src/config/linux/pthread.fpc

Description: Linux pthread support
requires_dlibs: -lpthread
requires_slibs: -lpthread

share/lib/rtl/flx_pthread_config.hpp

#ifndef __FLX_PTHREAD_CONFIG_H__
#define __FLX_PTHREAD_CONFIG_H__
#include "flx_rtl_config.hpp"
#ifdef BUILD_PTHREAD
#define PTHREAD_EXTERN FLX_EXPORT
#else
#define PTHREAD_EXTERN FLX_IMPORT
#endif
#endif