share/lib/rtl/faio_posixio.hpp
#ifndef __FLX_FAIO_POSIXIO_H__
#define __FLX_FAIO_POSIXIO_H__
#include <flx_faio_config.hpp>
#include "flx_async.hpp"
#include "demux_posix_demuxer.hpp"
#include "demux_timer_queue.hpp"
namespace flx { namespace faio {
class FAIO_EXTERN socketio_wakeup : public demux::socket_wakeup {
public:
demux::sel_param pb; int sio_flags; class socketio_request *request;
virtual void wakeup(demux::posix_demuxer& demux);
};
class FAIO_EXTERN socketio_request : public ::flx::async::flx_driver_request_base {
public:
socketio_wakeup sv;
demux::posix_demuxer *pd;
socketio_request() {} socketio_request(socketio_request const&);
void operator = (socketio_request const&);
socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool r);
bool start_async_op_impl();
};
class FAIO_EXTERN connect_request
: public ::flx::async::flx_driver_request_base, public demux::connect_control_block {
public:
demux::posix_demuxer *pd;
connect_request() {}
connect_request(demux::posix_demuxer *pd_a,const char* addr, int port);
bool start_async_op_impl();
virtual void wakeup(demux::posix_demuxer&);
};
class FAIO_EXTERN accept_request
: public ::flx::async::flx_driver_request_base, public demux::accept_control_block {
public:
demux::posix_demuxer *pd;
accept_request() {}
accept_request(demux::posix_demuxer *pd_a, int listener) : pd(pd_a) { s = listener; }
bool start_async_op_impl();
virtual void wakeup(demux::posix_demuxer& demux);
};
}}
#endif
share/src/faio/faio_posixio.cpp
#include <stdio.h> #include "faio_posixio.hpp"
#include "demux_sockety.hpp"
#include <sys/types.h> #include <sys/socket.h>
#include <unistd.h> #include <string.h> #include <assert.h>
using namespace flx::demux;
namespace flx { namespace faio {
connect_request::connect_request(demux::posix_demuxer *pd_a,const char* addr, int port) :pd(pd_a) { addy = addr; p = port; s=-1; }
socketio_request::socketio_request(demux::posix_demuxer *pd_a, int s, char* buf, long len, bool read)
: pd(pd_a)
{
sv.s = s;
sv.request = this;
sv.sio_flags = ((read) ? PDEMUX_READ : PDEMUX_WRITE);
sv.pb.buffer = buf;
sv.pb.buffer_size = len;
sv.pb.bytes_written = 0; }
socketio_request::socketio_request(socketio_request const &a) : pd(a.pd)
{
sv = a.sv;
sv.request = this;
}
void socketio_request::operator=(socketio_request const &a)
{
flx_driver_request_base::operator=(a);
sv = a.sv;
sv.request = this;
pd = a.pd;
}
bool
socketio_request::start_async_op_impl()
{
if(sv.s == -1) {
fprintf(stderr, "Attempt to start_async_op on socket -1\n");
exit(1);
}
bool failed = (pd->add_socket_wakeup(&sv, sv.sio_flags) == -1);
if (failed)
fprintf(stderr,"socketio_request FAILED %p, sock=%d, dir=%d\n",this, sv.s, sv.sio_flags);
return failed;
}
void
socketio_wakeup::wakeup(posix_demuxer& demux)
{
bool connection_closed;
if(wakeup_flags & PDEMUX_ERROR)
{
connection_closed = true;
fprintf(stderr,"posix faio wakeup PDEMUX_ERROR, connection closed = %d\n", connection_closed);
}
else if(wakeup_flags & PDEMUX_EOF)
{
connection_closed = true;
fprintf(stderr,"posix faio wakeup PDEMUX_EOF, connection closed = %d\n", connection_closed);
}
else if(wakeup_flags & PDEMUX_READ)
{
assert(wakeup_flags == PDEMUX_READ);
connection_closed = posix_demuxer::socket_recv(s, &pb);
}
else
{
assert(wakeup_flags == PDEMUX_WRITE);
connection_closed = posix_demuxer::socket_send(s, &pb);
}
if(connection_closed || pb.bytes_written == pb.buffer_size)
{
request->notify_finished();
return;
}
fprintf(stderr, "Incomplete request on %d, waiting for more I/O\n",s);
if(demux.add_socket_wakeup(this, sio_flags) == -1)
fprintf(stderr,"failed to re-add_socket_wakeup\n");
}
bool
connect_request::start_async_op_impl()
{
if(start(*pd) == -1) {
fprintf(stderr, "FAILED TO SPAWN CONNECT REQUEST\n");
return true;
}
return false;
/
}
void
connect_request::wakeup(posix_demuxer& demux)
{
connect_control_block::wakeup(demux);
notify_finished();
}
bool
accept_request::start_async_op_impl()
{
bool failed = (start(*pd) == -1); if(failed)
fprintf(stderr, "FAILED TO SPAWN ACCEPT REQUEST\n");
return failed;
}
void
accept_request::wakeup(posix_demuxer& demux)
{
accept_control_block::wakeup(demux);
if(accepted == -1)
{
fprintf(stderr, "accept request failed (%i), retrying...\n",
socket_err);
if(start(demux) == -1)
fprintf(stderr, "failed again... probably was a bad idea\n");
return;
}
notify_finished();
}
}}
share/lib/rtl/faio_winio.hpp
#ifndef __FLX_FAIO_WINIO_H__
#define __FLX_FAIO_WINIO_H__
#include <flx_faio_config.hpp>
#include <WinSock2.h>
#include <MSWSock.h>
#include "flx_async.hpp"
#include "demux_overlapped.hpp"
namespace flx { namespace faio {
class FAIO_EXTERN iocp_associator : public ::flx::async::flx_driver_request_base {
SOCKET s;
public:
demux::iocp_demuxer *iod;
iocp_associator() : iod(0) {} iocp_associator(demux::iocp_demuxer *iod_a, SOCKET associatee)
: s(associatee), iod(iod_a) {}
bool start_async_op_impl();
};
class FAIO_EXTERN waio_base : public ::flx::async::flx_driver_request_base {
protected:
::flx::async::finote_t *fn_a;
public:
demux::iocp_demuxer *iod;
bool success;
waio_base() : iod(0), success(false) {}
waio_base(demux::iocp_demuxer *iod_a) : iod(iod_a), success(false) {}
virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err);
};
class FAIO_EXTERN wasync_accept
: public waio_base, public demux::acceptex_control_block
{
public:
wasync_accept() {}
wasync_accept(demux::iocp_demuxer *iod_a,SOCKET l, SOCKET a) : waio_base(iod_a) { listener = l; acceptor = a; }
bool start_async_op_impl();
virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err);
};
class FAIO_EXTERN connect_ex
: public waio_base, public demux::connectex_control_block
{
public:
connect_ex() {}
connect_ex(demux::iocp_demuxer *iod_a,SOCKET soc, const char* addr, int port)
: waio_base(iod_a) { s = soc; addy = addr; p = port; }
bool start_async_op_impl();
virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err);
};
class FAIO_EXTERN wasync_transmit_file
: public waio_base, public demux::transmitfile_control_block
{
public:
wasync_transmit_file()
: waio_base(0), transmitfile_control_block(INVALID_SOCKET, NULL) {}
wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst) : waio_base(iod_a), transmitfile_control_block(dst) {}
wasync_transmit_file(demux::iocp_demuxer *iod_a,SOCKET dst, HANDLE src) : waio_base(iod_a), transmitfile_control_block(dst, src) {}
bool start_async_op_impl();
virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err);
};
class FAIO_EXTERN wsa_socketio
: public waio_base, public demux::wsasocketio_control_block
{
public:
wsa_socketio()
: wsasocketio_control_block(INVALID_SOCKET, NULL, false) {}
wsa_socketio(demux::iocp_demuxer *iod_a,SOCKET src, demux::sel_param* ppb, bool read)
: waio_base(iod_a), wsasocketio_control_block(src, ppb, read) {}
bool start_async_op_impl();
virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err);
};
}}
#endif
share/src/faio/faio_winio.cpp
#include "faio_winio.hpp"
#include <stdio.h>
using namespace flx::demux;
namespace flx { namespace faio {
bool
iocp_associator::start_async_op_impl()
{
if(iod->associate_with_iocp((HANDLE)s, 0) != 0)
fprintf(stderr,"associate request failed - get result here!\n");
return true; }
void
waio_base::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err)
{
if(NO_ERROR != err)
fprintf(stderr,"catchall wakeup got error: %i (should store it)\n", err);
success = (NO_ERROR == err); notify_finished();
}
bool
wasync_accept::start_async_op_impl()
{
return start_overlapped();
}
void
wasync_accept::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err)
{
waio_base::iocp_op_finished(nbytes, udat, olp, err);
}
bool
connect_ex::start_async_op_impl()
{
return start_overlapped();
}
void
connect_ex::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err)
{
waio_base::iocp_op_finished(nbytes, udat, olp, err);
}
bool
wasync_transmit_file::start_async_op_impl()
{
return start_overlapped();
}
void
wasync_transmit_file::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err)
{
waio_base::iocp_op_finished(nbytes, udat, olp, err);
}
bool
wsa_socketio::start_async_op_impl()
{
return start_overlapped(); }
void
wsa_socketio::iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
LPOVERLAPPED olp, int err)
{
ppb->bytes_written += nbytes;
if(0 == nbytes || ppb->finished())
{
waio_base::iocp_op_finished(nbytes, udat, olp, err);
}
else
{
if(start_overlapped())
fprintf(stderr, "socketio restart finished! WHAT TO DO!?!\n");
}
}
}}
share/lib/rtl/faio_timer.hpp
#ifndef __FLX_FAIO_TIMER_H__
#define __FLX_FAIO_TIMER_H__
#include <flx_faio_config.hpp>
#include "demux_demuxer.hpp" #include "flx_async.hpp"
#include "demux_timer_queue.hpp"
#include "flx_rtl.hpp"
namespace flx { namespace faio {
class FAIO_EXTERN sleep_request
: public ::flx::async::flx_driver_request_base, public demux::sleep_task
{
demux::timer_queue *sleepers;
double delta;
public:
sleep_request() {}
sleep_request(demux::timer_queue *sleepers_a, double d) :
sleepers(sleepers_a), delta(d)
{}
bool start_async_op_impl();
void fire();
};
}} #endif
share/src/faio/faio_timer.cpp
#include "faio_timer.hpp"
using namespace flx::demux;
namespace flx { namespace faio {
bool
sleep_request::start_async_op_impl()
{
sleepers->add_sleep_request(this, delta);
return false; }
void sleep_request::fire() {
notify_finished();
}
}}
$PWD/src/config/timer.fpc
Name: Timer
Description: Real time clock services
Requires: faio
includes: '"faio_timer.hpp"'
$PWD/src/config/unix/faio.fpc
Name: faio
Description: Asynchronous I/O support
provides_dlib: -lfaio_dynamic
provides_slib: -lfaio_static
includes: '"faio_posixio.hpp"'
Requires: flx_async flx_pthread demux flx flx_gc
library: faio
macros: BUILD_FAIO
srcdir: src/faio
src: faio_(timer|posixio)\.cpp
headers: faio_(drv|timer|posixio)\.hpp
$PWD/src/config/win/faio.fpc
Name: faio
Description: Asynchronous I/O support
provides_dlib: /DEFAULTLIB:faio_dynamic
provides_slib: /DEFAULTLIB:faio_static
includes: '"faio_winio.hpp"'
Requires: flx_async flx_pthread demux flx flx_gc
library: faio
macros: BUILD_FAIO
srcdir: src/faio
src: faio_(timer|winio)\.cpp
headers: faio_(drv|timer|winio)\.hpp
$PWD/buildsystem/faio.py
import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record
import buildsystem
# ------------------------------------------------------------------------------
def build_runtime(phase):
print('[fbuild] [faio]')
path = Path(phase.ctx.buildroot/'share'/'src/faio')
dst = 'host/lib/rtl/faio'
srcs = [
path / 'faio_timer.cpp',
]
includes = [
phase.ctx.buildroot / 'host/lib/rtl',
phase.ctx.buildroot / 'share/lib/rtl'
]
macros = ['BUILD_FAIO']
libs=[
call('buildsystem.flx_pthread.build_runtime', phase),
call('buildsystem.flx_async.build_runtime', phase),
call('buildsystem.demux.build_runtime', phase),
]
if 'win32' in phase.platform:
srcs.append(path / 'faio_winio.cpp')
includes.append(Path('src', 'demux', 'win'))
if 'posix' in phase.platform:
srcs.append(path / 'faio_posixio.cpp')
includes.append(Path('src', 'demux', 'posix'))
return Record(
static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
includes=includes,
macros=macros,
libs=[lib.static for lib in libs]),
shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
includes=includes,
macros=macros,
libs=[lib.shared for lib in libs]))
def build_flx(phase):
return
#return buildsystem.copy_flxs_to_lib(phase.ctx,
# Path('src/faio/*.flx').glob())
share/lib/rtl/flx_faio_config.hpp
#ifndef __FLX_FAIO_CONFIG_H__
#define __FLX_FAIO_CONFIG_H__
#include "flx_rtl_config.hpp"
#ifdef BUILD_FAIO
#define FAIO_EXTERN FLX_EXPORT
#else
#define FAIO_EXTERN FLX_IMPORT
#endif
#endif