ExpandCollapse

+ 1 Pthread Synopsis

share/lib/std/pthread/__init__.flx

  
  // pthreads (portable)
  include "std/pthread/pthread";
  //include "std/pthread/pchannels";
  include "std/pthread/mutex";
  //include "std/pthread/ts_bound_queue";
  //include "std/pthread/semaphore";
  //include "std/pthread/condition_variable";
  //include "std/pthread/ppipe";
  //include "std/pthread/forkjoin";
  //include "std/pthread/atomic";
  //include "std/pthread/threadpool";
  

+ 2 Pthreads.

General support for pre-emptive threading, aka shared memory concurrency. The core routines are based on Posix C interface. Emulations are provided for Windows.

The core support routines are written in C++. Adaption to the local platform operating system is done in C++ using configuration data provided by Felix configuration scripts.

Felix pthreads are always detached. It is not possible to directly wait on a pthread, kill a pthread, or join to a pthread. Pchannels or other devices such as mutex locks, semaphores or conditiona variables must be used for synchronisation instead.

share/lib/std/pthread/pthread.flx

  
  header pthread_hxx = '#include "pthread_thread.hpp"';
  header mutex_hxx = '#include "pthread_mutex.hpp"';
  header condv_hxx = '#include "pthread_condv.hpp"';
  header monitor_hxx = '#include "pthread_monitor.hpp"';
  header work_fifo_hxx = '#include "pthread_work_fifo.hpp"';
  
  This class provides access to the operating system's native
  threading routines. On systems with multiple cpus, this may
  increase performance as the operating system may schedule
  threads on different processors.
  open class Pthread
  {
    requires package "flx_pthread";
  
    spawn a detached pthread.
    proc spawn_pthread(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_spawn_pthread fthr;
    }
    spawn a detached pthread sharing active list with spawner
    proc spawn_process(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_spawn_process fthr;
    }
    proc thread_yield : 1 = "ptf->gcp->collector->get_thread_control()->yield();";
  }
  

+ 3 Pchannels.

A pchannel is a monitor object, which is used to synchronise pthreads by use of read and write procedures which transfer a pointer to a heap allocated object. Ownership is transfered from the writer to the reader.

After initial synchronisation the read gains control and takes possession of the data. The reader then signals that the writer may proceed. The control interlock ensures that the reader is able to capture the data from the writer without the writer interfering. This may be necessary if the value needs to be deep copied, for example. The monitor data exchange protocol is designed to permit transfer of data on the writer's machine stack, or data which the writer may modify after regaining control. However the read/write operations on pchannels automatically copy the data onto the heap and perform the synchronisation.

Pchannels should be used carefully because they block the whole pthread, that is, all fibres. Unlike fibres, if a deadlock occurs it cannot be resolved and should generally be considered a programming error.

share/lib/std/pthread/pchannels.flx

  
  Pchannels are unbuffered synchronisation points
  for pre-emptive threads.
  //$ Similarly to schannels, paired reader-writer pthreads
  cannot proceed until both parties agree data exchange is complete.
  Unlike schannels, both reader and writer can subsequently
  continue concurrently after the exchange.
  open class Pchannel
  {
    requires package "flx_pthread";
  
    Pre-emptive thread channels (monitor).
    type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    Pre-emptive thread input channel.
    type ipchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    Pre-emptive thread output channel.
    type opchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
  
    Make bidirectional pchannel.
    fun mk_pchannel[t]: 1->pchannel[t] = "new flx::pthread::monitor_t(ptf->gcp->collector->get_thread_control())";
  
    Safe cast from bidirectional to output pchannel.
    ctor[t] opchannel[t](x:pchannel[t]) => C_hack::cast[opchannel[t]] x;
    Safe cast from bidirectional to input pchannel.
    ctor[t] ipchannel[t](x:pchannel[t]) => C_hack::cast[ipchannel[t]] x;
  
    Make an input and an output pchannel out of a bidirectional channel.
    fun mk_iopchannel_pair[t](var ch:pchannel[t]) =>
      ipchannel[t] ch, opchannel[t] ch
    ;
  
    Construct a connected input and output pchannel pair.
    fun mk_iopchannel_pair[t]() =>
      mk_iopchannel_pair[t]$ mk_pchannel[t] ()
    ;
  
  
    // NOTE: read/write on pchannels uses suspend/resume
    // to tell any pending collector it is safe to proceed
    // whilst it is doing the I/O (which may block),
    // to block returning from the I/O during a collection
    // AND, if the I/O completed before the collection got
    // going, to yield at this point.
  
    Read from a pchannel.
    proc _read[t]: pchannel[t] * &&t = """
      {
      //fprintf(stderr,"READ:DQ\\n");
      *$2 = (?1*)($1->dequeue());
      ptf->gcp->collector->remove_root(*$2);
      //fprintf(stderr,"DONE READ:DQ\\n");
      }
    """ requires property "needs_ptf";
  
    Write to a pchannel.
    noinline gen read[t] (chan:pchannel[t]) = {
      var p : &t;
      _read (chan,  &p);
      return *p;
    }
    gen read[t] (chan:ipchannel[t]) => read$ C_hack::cast[pchannel[t]] chan;
  
    proc _write[t]: pchannel[t] * &t = """
      {
      //fprintf(stderr,"WRITE:NQ\\n");
      ptf->gcp->collector->add_root($2);
      $1->enqueue((void*)$2);
      //fprintf(stderr,"DONE WRITE:NQ\\n");
      }
    """ requires property "needs_ptf";
  
    noinline proc write[t](chan:pchannel[t], v:t) {
      var ps = unbox$ new v;
      _write (chan,ps);
    }
    proc write[t] (chan:opchannel[t], v:t) { write$ C_hack::cast[pchannel[t]] chan,v; }
  }
  

+ 4 Ppipes.

share/lib/std/pthread/ppipe.flx

  
  Asynchronous Synchronous Pipe.
  Used to link pthreads.
  open class Ppipe {
  
    Send an stream down a channel.
    proc psource[T] (var it:1 -> T) (out:opchannel[T]) 
    {
      while true do write (out,#it); done 
    }
  
    isrc converts a streamable data structure
    such as an array into a source.
    proc pisrc[V,T with Streamable[T,V]] (dat:T) (out:opchannel[opt[V]])
    {
      psource[opt[V]] (dat.iterator) out;
    }
  
  
    Wire a source component to a sink.
    Return coupled fibre ready to run.
    fun pipe[T] 
      (w: opchannel[T] -> 0,
      r: ipchannel[T] -> 0)
    :
      1 -> 0
    => 
      {
        var chi,cho = mk_iopchannel_pair[T] ();
        spawn_pthread { (w cho); };
        spawn_pthread { (r chi); };
      }
    ;
  
    Wire a source component to a transducer.
    Return source.
    fun pipe[T,U]
      (w: opchannel[T] -> 0,
      t: ipchannel[T] * opchannel[U] -> 0)
    :
      opchannel[U] -> 0 
    => 
      proc (out:opchannel[U])
      {
        var chi,cho = mk_iopchannel_pair[T] ();
        spawn_pthread { (w cho); };
        spawn_pthread { (t (chi, out)); };
      }
    ;
  
    xpipe connects a streamable data structure
    such as an array directly into a transducer.
    fun xpipe[V,T,U with Streamable[T,V]] 
      (
        a:T,
        t: ipchannel[opt[V]] * opchannel[U] -> 0
      )
      : opchannel[U] -> 0 =>
      pipe (a.pisrc[V],t)
    ;
  
  
    Wire a transducer into a transducer.
    Return another transducer.
    fun pipe[T,U,V]
      (a: ipchannel[T] * opchannel[U] -> 0,
      b: ipchannel[U] * opchannel[V] -> 0)
    :
      ipchannel[T] * opchannel[V] -> 0 
    => 
      proc (inp:ipchannel[T], out:opchannel[V])
      {
        var chi,cho = mk_iopchannel_pair[U] ();
        spawn_pthread { a (inp, cho); };
        spawn_pthread { b (chi, out); };
      }
    ;
  
    Wire a transducer into a sink.
    Return a sink.
    fun pipe[T,U]
      (a: ipchannel[T] * opchannel[U] -> 0,
      b: ipchannel[U] -> 0)
    :
      ipchannel[T]  -> 0 
    => 
      proc (inp:ipchannel[T])
      {
        var chi,cho = mk_iopchannel_pair[U] ();
        spawn_pthread { a (inp, cho); };
        spawn_pthread { b (chi); };
      }
    ;
  
  
    Stream sort using intermediate darray.
    Requires stream of option type.
    proc sort[T with Tord[T]] (r: ipchannel[opt[T]], w: opchannel[opt[T]])
    {
       var x = darray[T]();
       acquire:while true do
         match read r with
         | Some v => x+=v;
         | #None => break acquire;
         endmatch;
       done
       x.sort;
       for v in x do
         write (w, Some v);
       done
       write (w,None[T]);
    }
  }
  

+ 5 Fork/Join.

share/lib/std/pthread/forkjoin.flx

  include "std/pthread/pchannels";
  
  Implement fork/join protocol.
  open class ForkJoin 
  {
    Launch a set of pthreads and wait
    until all of them are finished.
    proc concurrently_by_iterator (var it:1 -> opt[1->0]) 
    {
       // Make a channel to signal termination.
       var iterm,oterm = mk_iopchannel_pair[int](); // should be unit but that bugs out at the moment
       noinline proc manager (var p: 1->0) () { p(); write (oterm, 1); }
       // Count the number of pthreads.
       var count = 0;
     again:>
       match #it with
       | Some p => 
         ++count; 
         spawn_pthread$ manager p; 
        goto again;
  
       | #None =>
         while count > 0 do
           C_hack::ignore (read iterm);
           --count;
         done
       endmatch;
    }
  
    proc concurrently[T with Streamable[T,1->0]] (d:T) => concurrently_by_iterator d.iterator;
  
  }

+ 6 Mutual Exclusion Lock (Mutex)

Mutex may be used to protect some region of memomry associated with that mutex conceptually, by locking the mutex for a short period of time. The region may then be modified atomically.

A Felix mutex is created on the heap and must be destroyed after use manually, they're not garbage collected.

share/lib/std/pthread/mutex.flx

  
  open class Mutex
  {
    requires package "flx_pthread";
    // this needs to be fixed to work with gc but at the
    // moment the uglier solution will suffice
    type mutex = "::flx::pthread::flx_mutex_t*" requires mutex_hxx;
    ctor mutex: unit = "new ::flx::pthread::flx_mutex_t";
    proc lock: mutex = "$1->lock();";
    proc unlock: mutex = "$1->unlock();";
    proc destroy: mutex = "delete $1;";
  }

+ 7 Semaphores.

A semaphore is a counted lock. The sem_post procedure increments the counter, and the sem_wait procedure decrements it. However, the counter may not become negative so instead, if it were to become negative, the sem_wait procedure blocks the current pthread, and the pthread joins a set of pthreads waiting on the semaphore. When the counter is finally incremented by a call from some pthread to sem_post one of the pthreads waiting with sem_wait is allowed to proceed, again decrementing the counter to zero so the remaining pthreads waiting continue to do so.

The procedure sem_trywait instead returns a flag indicating whether it succeeded in decrementing the counter or not.

The term post is derived from the idea of posting a flag.

The counting feature of a semaphore is analogous to shoppers in a store. The sem_post function puts products on the shelf, whilst the sem_wait function represents an order on which the customer is waiting due to unavailable stock .. and sem_trywait is the customer that, seeing there is no available stock, decides to go elsewhere!

+ 8 Condition Variables.

share/lib/std/pthread/condition_variable.flx

  
  Condition Variable for pthread synchronisation.
  open class Condition_Variable
  {
    requires package "flx_pthread";
  
    The type of a condition variable.
    type condition_variable = "::flx::pthread::flx_condv_t*" requires condv_hxx;
  
    Condition variable constructor taking unit argument.
    ctor condition_variable: 1 = "new ::flx::pthread::flx_condv_t(ptf->gcp->collector->get_thread_control())";
  
    Function to release a condition variable.
    proc destroy: condition_variable = "delete $1;";
  
    lock/unlock associated mutex
    proc lock : condition_variable = "$1->lock();";
    proc unlock : condition_variable = "$1->unlock();";
  
    Function to wait until a signal is raised on
    the condition variable by another thread.
    proc wait: condition_variable = "$1->wait();";
  
    Function to raise a signal on a condition
    variable which will allow at most one thread
    waiting on it to proceed.
    proc signal: condition_variable = "$1->signal();";
  
    Function to broadcast a signal releasing all
    threads waiting on a conditiona variable.
    proc broadcast: condition_variable = "$1->broadcast();";
  
    Timed wait for signal on condition variable.
    Time in seconds. Resolution nanoseconds.
    gen timedwait: condition_variable * double -> int = "$1->timedwait($3)";
  }
  

+ 9 Thread Safe Bound Queue.

share/lib/std/pthread/ts_bound_queue.flx

  
  open class TS_Bound_Queue
  {
    private uncopyable type bQ_ = "::flx::pthread::bound_queue_t";
    _gc_pointer _gc_type bQ_ type ts_bound_queue_t[T] = "::flx::pthread::bound_queue_t*" 
      requires 
       package "flx_bound_queue",
       scanner "::flx::pthread::bound_queue_scanner"
    ;
    ctor[T] ts_bound_queue_t[T]: !ints = 
      """
        new (*ptf->gcp, @0, false) ::flx::pthread::bound_queue_t(
        ptf->gcp->collector->get_thread_control(), (size_t)$1)
      """ requires property "needs_ptf";
  
    // NOTE: enqueue/dequeue on queues uses suspend/resume
    // to tell any pending collector it is safe to proceed
    // whilst it is doing the operations (which may block),
    // to block returning from the I/O during a collection
    // AND, if the I/O completed before the collection got
    // going, to yield at this point.
  
  
    private proc _enqueue[T]: ts_bound_queue_t[T] * &T = """
      FLX_SAVE_REGS;
  //fprintf(stderr,"enqueue to ts_bound_queue q=%p starts, item=%p\\n", $1, $2);
      //ptf-> gcp->collector->get_thread_control()->suspend();
      $1->enqueue((void*)$2);
  //fprintf(stderr,"enqueue to ts_bound_queue q=%p done, item=%p\\n", $1, $2);
      //ptf-> gcp->collector->get_thread_control()->resume();
    """;
  
  
    // Duh .. what happens if $2 storage location is set by
    // the dequeue in the middle of a collection?
    // it might be NULL when scanned, but by the time the queue
    // is scanned the value will be lost from the queue and
    // in the variable instead!
    // The RACE is on!
    private proc _dequeue[T]: ts_bound_queue_t[T] * &&T = """
      FLX_SAVE_REGS;
  //fprintf(stderr,"dequeue from ts_bound_queue %p starts\\n", $1);
      //ptf-> gcp->collector->get_thread_control()->suspend();
      *$2=(?1*)$1->dequeue();
  //fprintf(stderr,"dequeue from ts_bound_queue done q=%p item=%p\\n",$1,*$2);
      //ptf-> gcp->collector->get_thread_control()->resume();
    """;
  
    proc enqueue[T] (Q:ts_bound_queue_t[T])  (elt:T) {
       _enqueue(Q, new elt);
    }
  
    gen dequeue[T] (Q:ts_bound_queue_t[T]): T = {
      var x:&T;
      _dequeue (Q,&x);
      return *x;
    }
  
  
    proc wait[T]: ts_bound_queue_t[T] = """
      FLX_SAVE_REGS;
      //ptf-> gcp->collector->get_thread_control()->suspend();
      $1->wait_until_empty();
      //ptf-> gcp->collector->get_thread_control()->resume();
    """;
  
    proc resize[T]: ts_bound_queue_t[T] * !ints = "$1->resize((size_t)$2);";
   
  }
  

+ 10 Atomic operations

share/lib/std/pthread/atomic.flx

  open class Atomic
  {
    // note: only works for some types: constraints need to be added.
    // We have to use a pointer because atomics aren't copyable
  
    type atomic[T]="::std::atomic<?1>*" requires Cxx11_headers::atomic;
  
    // FIXME: not managed by GC yet!
    // constructor
    ctor[T] atomic[T]: T = "(new ::std::atomic<?1>($1))"; 
  
    proc delete[T] : atomic[T] = "delete $1;";
  
    // note: only works for even less types! Constraints needed.
    proc pre_incr[T] : &atomic[T] = "++**$1;";
    proc pre_decr[T] : &atomic[T] = "--**$1;";
    gen load[T] : atomic[T] -> T = "$1->load()";
    proc store[T] : atomic[T] * T = "$1->store($2);";
    proc store[T] (a:atomic[T]) (v:T) { store (a,v); }
  
    instance[T] Str[atomic[T]] {
      fun str (var x:atomic[T]) => x.load.str;
    }
    inherit[T] Str[atomic[T]];
  }
  

+ 11 Thread Pool

A thread pool is a global object containing set of running threads and a queue. Instead of spawning a new thread, the client just queues the job instead. Each thread grabs a job from the queue and runs it, on completion it grabs another job.

The primary advantage of a global thread pool is it prevent oversaturation of the set of processors and thus excess context switching. The main downside is monitoring the completed state of jobs.

Do not use the threadpool for quick jobs, there is a significant overhead posting a job.

share/lib/std/pthread/threadpool.flx

  
  include "std/pthread/ts_bound_queue";
  include "std/pthread/atomic";
  include "std/io/faio";
  include "std/pthread/condition_variable";
  include "std/pthread/pchannels";
  
  class ThreadPool
  {
    typedef job_t = 1 -> 0;
    private const ThreadStop : job_t = "NULL";
    private fun isStop : job_t -> bool = "$1==NULL";
    private var clock = #Faio::mk_alarm_clock;
    private var jobqueue = ts_bound_queue_t[job_t] 1024; // queue up to 1K jobs
    private var nthreads = 8; // great default for quad core i7 ?
  
    // number of threads actually running
    private var running = atomic 0;
  
    // number of threads blocked waiting on a barrier
    private var waiting = atomic 0;
  
    // barrier lock
    private var block = #condition_variable;
  
    fun get_nthreads () => nthreads;
  
    // This is a flag used to protect against nested pfor loops.
    // If there is a nested pfor loop, it will just execute serially
    // in the calling thread.
    private var pforrunning = atomic 0;
  
    proc barrier() {
  //println$ "Barrier";
      block.lock;
      ++waiting;
      if waiting.load == nthreads do
        waiting.store 0;
        block.broadcast;
      else
      again:>
        block.wait;
        if waiting.load != 0 goto again;
      done
      block.unlock; 
    }
  
    proc start () { 
  //println$ "Thread pool start()";
       for i in 1..nthreads call spawn_pthread jobhandler;
  //println$ "Threads spawned";
    }
  
    proc start (n:int) {
       nthreads = n;
       #start;
    }
  
    private proc jobhandler () {
  //println$ "Job handler thread #"+running.str+" started";
       var id = running;
       ++running;
       rpt:while true do
  //println$ "Trying to dequeue a job id=" + id.str;
         var job = dequeue jobqueue;
  //println$ "Job dequeued id="+id.str;
         if isStop job break rpt;
         job; 
         thread_yield();
       done
       --running;
    }
  
    proc queue_job (job:job_t) {
  //println$ "Queuing job";
      if running.load == 0 call start ();
      if nthreads > 0 do 
        call enqueue jobqueue job;
      else
        call job;
      done
    }
  
    proc stop () {
      for i in 1..nthreads 
        call enqueue jobqueue ThreadStop;
      while running.load != 0 
        call Faio::sleep(clock,0.001);
    }
  
    proc post_barrier() {
      if nthreads > 0
        for i in 1..nthreads call queue_job barrier;
    }
  
    proc notify (chan:opchannel[int]) () {
      write (chan,1);
    }
  
    proc join () {
      if nthreads > 0 do
        post_barrier;
        var ip,op = #mk_iopchannel_pair[int];
        queue_job$ notify op;
        var x = read ip;
        C_hack::ignore(x);
      done
    }
  
    proc pfor_segment (first:int, last:int) (lbody: int * int -> 1 -> 0)
    {
  //println$ "Pfor segment " + first.str + "," last.str;
      var N = last - first + 1;
      var nt = nthreads + 1;
      if pforrunning.load == 0 and N >= nthreads and nthreads > 0 do
        pforrunning.store 1;
        for var counter in 0 upto nt - 2 do
          var sfirst = first + (N * counter) / nt;
          var slast = first + (N * (counter + 1)) / nt - 1;
  //println$ "QUEUE JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
          ThreadPool::queue_job$ lbody (sfirst, slast);
        done
        sfirst = first + (N * (nt - 1)) / nt;
        slast = last;
  //println$ "UNQUEUED JOB: Counter = " + counter.str + ", sfirst=" + sfirst.str + ", slast=" + slast.str;
        lbody (sfirst, slast) ();
        join;
        pforrunning.store 0;
      else
        // Run serially
        lbody (first, last) ();
      done
    }
  
    noinline proc forloop (lbody: int -> 0) (first:int, last:int) ()
    {
  //println$ "forloop " + first.str + "," + last.str;
      for var i in first upto last call lbody i; 
    }
    noinline proc pforloop (first: int) (last:int) (lbody: int -> 0)
    {
  //println$ "Pfor segment " + first.str + "," last.str;
      pfor_segment (first, last)  (forloop lbody);
    }
    inline proc tpfor (first:int, last:int, lbody: int-> 0)
    {
       pforloop first last lbody;
    }
   
  }

+ 11.1 Thread Pool Demo

share/demo/threadpoolex1.flx

  include "std/pthread/threadpool";
  open ThreadPool;
  
  // Matrix multiply
  macro val N = 1000;
  typedef N = 1000;
  
  typedef vec_t = array[double, N];
  typedef mx_t = array[vec_t,N];
  var a : mx_t;
  var b : mx_t;
  var r : mx_t;
  var s : mx_t;
  
  proc clear (mx:&mx_t) {
    for i in 0..<N 
    for j in 0..<N 
      perform mx . i . j <- 0.0;
  }
  
  proc rinit (mx:&mx_t) {
    for i in 0..<N
    for j in 0..<N
      perform mx . i . j <- #rand.double / RAND_MAX.double;
  }
  
  fun check() = {
  //println$ "Verification check";
    for i in 0..<N
    for j in 0..<N
      if r.i.j != s.i.j return false;
    return true;
  }
  
  proc verify() {
  //println$ "Running verify";
    if #check do
      println$ "Verified";
    else
      println "Wrong!";
    done 
  //println$ "Verify ran";
  }
  
  clear &r;
  clear &s;
  rinit &a;
  rinit &b;
  
  fun inner_product (pr: &vec_t, pc: &vec_t) = 
  {
    var sum = 0.0;
    for (var k=0; k<N; ++k;)
      perform sum = sum + *(pr.k) * *(pc.k);
    return sum;
  }
  
  // naive multiply
  var start = #time;
  begin
    for i in 0..<N 
    for (var j=0; j<N; ++j;)
      perform &r . i . j <- inner_product (&a.i, &b.j);
    s = r;
  end
  var fin = #time;
  println$ "Naive mul elapsed " + (fin - start).str + " seconds";
  
  //println$ "Starting thread pool";
  ThreadPool::start 8;
  //println$ "Thread pool started";
  
  // naive parallel multiply
  noinline proc inner_products_proc (var i:int)
  {
    for (var j=0; j<N; ++j;) 
      perform &r . i . j <- inner_product (&a.i, &b.j);
  }
  
  noinline proc inner_products_job (var i:int) () {
    for (var j=0; j<N; ++j;) 
      perform &r . i . j <- inner_product (&a.i, &b.j);
  }
  
  clear &r;
  start = #time;
  begin
    for i in 0..<N
      call ThreadPool::queue_job$ inner_products_job (i);
    ThreadPool::join;
  end
  fin = #time;
  println$ "Naive Parallel mul elapsed " + (fin - start).str + " seconds";
  verify;
  
  // smart parallel multiply
  clear &r;
  start = #time;
  begin
  println$ "Using thread pool's pforloop";
    ThreadPool::pforloop 0 (N - 1) inner_products_proc;
  end
  fin = #time;
  println$ "Smart Parallel mul elapsed " + (fin - start).str + " seconds";
  verify;
  
  // smart parallel multiply with syntax
  clear &r;
  start = #time;
  begin
    pfor i in 0 upto (N - 1) do
    for (var j=0; j<N; ++j;) 
      perform &r . i . j <- inner_product (&a.i, &b.j);
    done
  end
  fin = #time;
  println$ "pfor mul elapsed " + (fin - start).str + " seconds";
  verify;
  
  
  ThreadPool::stop;