2.1 Pthreads and Channels
Felix also supports pre-emptive threads with channels. The syntax is similar to that used for fibres and channels:
pthreads_01_data.data
Some test data. For pthreads_01.html.
include "std/pthread/pchannels"; proc client(x:ipchannel[opt[string]]) { var finished = false; while not finished do match read x with | Some line => print line; | #None => finished = true; endmatch; done } proc server(x:opchannel[opt[string]]) { var directory = System::argv 1; var filename = Filename::join (directory,"pthreads_01_data.data"); f := fopen_input_text filename; var line = readln f; while line != "" do write$ x,Some line; line = readln f; done; fclose f; write$ x, None[string]; } proc launch() { inp,out:= mk_iopchannel_pair[opt[string]](); spawn_pthread { client(inp); }; spawn_pthread { server (out); }; } launch;
Some test data. For pthreads_01.html.
2.1.1 Broadcast example
Here is the broadcast code using pthreads:
gen broadcaster() : opchannel[opt[opchannel[string]]] * // subscription channel opchannel[string] // news reporting channel = { // ------------------------------------------------------------- // SUBSCRIPTIONS // ------------------------------------------------------------- // subscriber list var clients = darray[opchannel[string]] (); // subscription channel iregistry, oregistry := mk_iopchannel_pair[opt[opchannel[string]]](); // accept registrations until end of broadcast // a subscription of None signals end of broadcast spawn_pthread { var end_of_news = false; while not end_of_news do match read iregistry with | #None => end_of_news = true; | Some chan => push_back (clients, chan); println "Got a subscriber"; endmatch; done println "Finished accepting subscribers"; }; // ------------------------------------------------------------- // RELAY THE NEWS // ------------------------------------------------------------- // news reading channel inews, onews := mk_iopchannel_pair[string](); // send news to all clients spawn_pthread { var news_line = read inews; while true do if len clients > 0uz do for var i in 0uz upto len clients - 1uz do write$ clients.i, news_line; done done if news_line == "END OF NEWS\n" do println "Finished news relay"; return; done news_line = read inews; done }; // return the channel for subscribing to news reports, // and the channel for making news reports return oregistry, onews; } // Create the broadcast station subscribe, news := broadcaster(); // create a template for bored train commuters who want to read the news proc commuter (i:int) { ichan, ochan := mk_iopchannel_pair[string](); write$ subscribe, Some ochan; //subscribe to news var line = read ichan; while line != "END OF NEWS\n" do print$ "Commuter " + str i + ": " + line; line = read ichan; done print$ "Commuter " + str i + ": " + line; } // create two commuters spawn_pthread { commuter 1; }; spawn_pthread { commuter 2; }; // create a reporter that just sends a file as news spawn_pthread { var i = 1; while i < 10 do print$ "Reporting : " + i.str; write$ news, i.str+"\n"; ++i; done write$ news, "END OF NEWS\n"; write$ subscribe, None[opchannel[string]]; println "Done reporting news"; }; println$ "Demo started";
You should note that there's no assurance the subscribers get all the news! In fact this code can deadlock, because the news can be over before one of the subscribers gets to subscribe. In that case the write to the subscription channel will hang forever since the reader is gone.