Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

FIX::ThreadedSocketAcceptor Class Reference

Threaded Socket implementation of Acceptor. More...

#include <ThreadedSocketAcceptor.h>

Inheritance diagram for FIX::ThreadedSocketAcceptor:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketAcceptor:
Collaboration graph
[legend]

List of all members.

Classes

struct  AcceptorThreadInfo
struct  ConnectionThreadInfo

Public Member Functions

 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
virtual ~ThreadedSocketAcceptor ()

Private Types

typedef std::set< int > Sockets
typedef std::set< SessionIDSessions
typedef std::map< int, SessionsPortToSessions
typedef std::map< int, int > SocketToPort
typedef std::map< int, thread_idSocketToThread

Private Member Functions

bool readSettings (const SessionSettings &)
void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize acceptor.
void onStart ()
 Implemented to start listening for connections.
bool onPoll (double timeout)
 Implemented to connect and poll for events.
void onStop ()
 Implemented to stop a running acceptor.
void addThread (int s, thread_id t)
void removeThread (int s)

Static Private Member Functions

static THREAD_PROC socketAcceptorThread (void *p)
static THREAD_PROC socketConnectionThread (void *p)

Private Attributes

Sockets m_sockets
PortToSessions m_portToSessions
SocketToPort m_socketToPort
SocketToThread m_threads
Mutex m_mutex

Friends

class SocketConnection

Detailed Description

Threaded Socket implementation of Acceptor.

Definition at line 36 of file ThreadedSocketAcceptor.h.


Member Typedef Documentation

typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions [private]

Definition at line 73 of file ThreadedSocketAcceptor.h.

typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions [private]

Reimplemented from FIX::Acceptor.

Definition at line 72 of file ThreadedSocketAcceptor.h.

typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets [private]

Definition at line 71 of file ThreadedSocketAcceptor.h.

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort [private]

Definition at line 74 of file ThreadedSocketAcceptor.h.

typedef std::map< int, thread_id > FIX::ThreadedSocketAcceptor::SocketToThread [private]

Definition at line 75 of file ThreadedSocketAcceptor.h.


Constructor & Destructor Documentation

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
) throw ( ConfigError )

Definition at line 33 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

00037 : Acceptor( application, factory, settings )
00038 { socket_init(); }

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
) throw ( ConfigError )

Definition at line 40 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

00045 : Acceptor( application, factory, settings, logFactory )
00046 { 
00047   socket_init(); 
00048 }

FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor (  )  [virtual]

Definition at line 50 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_term().

00051 { 
00052   socket_term(); 
00053 }


Member Function Documentation

void FIX::ThreadedSocketAcceptor::addThread ( int  s,
thread_id  t 
) [private]

Definition at line 184 of file ThreadedSocketAcceptor.cpp.

References m_mutex, m_threads, QF_STACK_POP, and QF_STACK_PUSH.

Referenced by onStart().

00185 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread)
00186 
00187   Locker l(m_mutex);
00188 
00189   m_threads[ s ] = t;
00190 
00191   QF_STACK_POP
00192 }

void FIX::ThreadedSocketAcceptor::onConfigure ( const SessionSettings  )  throw ( ConfigError ) [private, virtual]

Implemented to configure acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 55 of file ThreadedSocketAcceptor.cpp.

References FIX::Dictionary::getBool(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.

00057 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure)
00058 
00059   std::set<SessionID> sessions = s.getSessions();
00060   std::set<SessionID>::iterator i;
00061   for( i = sessions.begin(); i != sessions.end(); ++i )
00062   {
00063     const Dictionary& settings = s.get( *i );
00064     settings.getLong( SOCKET_ACCEPT_PORT );
00065     if( settings.has(SOCKET_REUSE_ADDRESS) )
00066       settings.getBool( SOCKET_REUSE_ADDRESS );
00067     if( settings.has(SOCKET_NODELAY) )
00068       settings.getBool( SOCKET_NODELAY );
00069   }
00070 
00071   QF_STACK_POP
00072 }

void FIX::ThreadedSocketAcceptor::onInitialize ( const SessionSettings  )  throw ( RuntimeError ) [private, virtual]

Implemented to initialize acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 74 of file ThreadedSocketAcceptor.cpp.

References FIX::IntConvertor::convert(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, FIX::SOCKET_REUSE_ADDRESS, FIX::SOCKET_SEND_BUFFER_SIZE, and FIX::socket_setsockopt().

00076 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize)
00077 
00078   short port = 0;
00079   std::set<int> ports;
00080 
00081   std::set<SessionID> sessions = s.getSessions();
00082   std::set<SessionID>::iterator i = sessions.begin();
00083   for( ; i != sessions.end(); ++i )
00084   {
00085     Dictionary settings = s.get( *i );
00086     port = (short)settings.getLong( SOCKET_ACCEPT_PORT );
00087 
00088     m_portToSessions[port].insert( *i );
00089 
00090     if( ports.find(port) != ports.end() )
00091       continue;
00092     ports.insert( port );
00093 
00094     const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ? 
00095       s.get().getBool( SOCKET_REUSE_ADDRESS ) : true;
00096 
00097     const bool noDelay = settings.has( SOCKET_NODELAY ) ? 
00098       s.get().getBool( SOCKET_NODELAY ) : false;
00099 
00100     const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
00101       s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0;
00102 
00103     const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
00104       s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
00105 
00106     int socket = socket_createAcceptor( port, reuseAddress );
00107     if( socket < 0 )
00108     {
00109       SocketException e;
00110       socket_close( socket );
00111       throw RuntimeError( "Unable to create, bind, or listen to port " 
00112                          + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
00113     }
00114     if( noDelay )
00115       socket_setsockopt( socket, TCP_NODELAY );
00116     if( sendBufSize )
00117       socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00118     if( rcvBufSize )
00119       socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00120 
00121     m_socketToPort[socket] = port;
00122     m_sockets.insert( socket );
00123   }    
00124 
00125   QF_STACK_POP
00126 }

bool FIX::ThreadedSocketAcceptor::onPoll ( double  second  )  [private, virtual]

Implemented to connect and poll for events.

Implements FIX::Acceptor.

Definition at line 145 of file ThreadedSocketAcceptor.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

00146 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll)
00147 
00148   return false;
00149 
00150   QF_STACK_POP
00151 }

void FIX::ThreadedSocketAcceptor::onStart (  )  [private, virtual]

Implemented to start listening for connections.

Implements FIX::Acceptor.

Definition at line 128 of file ThreadedSocketAcceptor.cpp.

References addThread(), m_mutex, m_sockets, m_socketToPort, QF_STACK_POP, QF_STACK_PUSH, socketAcceptorThread(), and FIX::thread_spawn().

00129 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart)
00130 
00131   Sockets::iterator i;
00132   for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
00133   {
00134     Locker l( m_mutex );
00135     int port = m_socketToPort[*i];
00136     AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
00137     thread_id thread;
00138     thread_spawn( &socketAcceptorThread, info, thread );
00139     addThread( *i, thread );
00140   }
00141 
00142   QF_STACK_POP
00143 }

void FIX::ThreadedSocketAcceptor::onStop (  )  [private, virtual]

Implemented to stop a running acceptor.

Implements FIX::Acceptor.

Definition at line 153 of file ThreadedSocketAcceptor.cpp.

References FIX::Acceptor::isLoggedOn(), m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), FIX::Acceptor::start(), and FIX::thread_join().

00154 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop)
00155   
00156   SocketToThread threads;
00157   SocketToThread::iterator i;
00158 
00159   {
00160     Locker l(m_mutex);
00161 
00162     time_t start = 0;
00163     time_t now = 0;
00164 
00165     ::time( &start );
00166     while ( isLoggedOn() )
00167     {
00168       if( ::time(&now) -5 >= start )
00169         break;
00170     }
00171 
00172     threads = m_threads;
00173     m_threads.clear();
00174   }
00175 
00176   for ( i = threads.begin(); i != threads.end(); ++i )
00177     socket_close( i->first );
00178   for ( i = threads.begin(); i != threads.end(); ++i )
00179     thread_join( i->second );
00180 
00181   QF_STACK_POP
00182 }

bool FIX::ThreadedSocketAcceptor::readSettings ( const SessionSettings  )  [private]
void FIX::ThreadedSocketAcceptor::removeThread ( int  s  )  [private]

Definition at line 194 of file ThreadedSocketAcceptor.cpp.

References m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().

00195 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread)
00196 
00197   Locker l(m_mutex);
00198   SocketToThread::iterator i = m_threads.find( s );
00199   if ( i != m_threads.end() )
00200   {
00201     thread_detach( i->second );
00202     m_threads.erase( i );
00203   }
00204 
00205   QF_STACK_POP
00206 }

THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread ( void *  p  )  [static, private]

Definition at line 208 of file ThreadedSocketAcceptor.cpp.

References QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), and FIX::thread_spawn().

Referenced by onStart().

00209 { QF_STACK_TRY
00210   QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread)
00211 
00212   AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
00213 
00214   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00215   int s = info->m_socket;
00216   int port = info->m_port;
00217   delete info;
00218 
00219   int noDelay = 0;
00220   int sendBufSize = 0;
00221   int rcvBufSize = 0;
00222   socket_getsockopt( s, TCP_NODELAY, noDelay );
00223   socket_getsockopt( s, SO_SNDBUF, sendBufSize );
00224   socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
00225 
00226   int socket = 0;
00227   while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
00228   {
00229     if( noDelay )
00230       socket_setsockopt( socket, TCP_NODELAY );
00231     if( sendBufSize )
00232       socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
00233     if( rcvBufSize )
00234       socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
00235 
00236     Sessions sessions = pAcceptor->m_portToSessions[port];
00237 
00238     ThreadedSocketConnection * pConnection =
00239       new ThreadedSocketConnection
00240         ( socket, sessions, pAcceptor->getApplication(), pAcceptor->getLog() );
00241 
00242     ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
00243 
00244     {
00245       Locker l( pAcceptor->m_mutex );
00246 
00247       std::stringstream stream;
00248       stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
00249 
00250       if( pAcceptor->getLog() )
00251         pAcceptor->getLog()->onEvent( stream.str() );
00252 
00253       thread_id thread;
00254       if ( !thread_spawn( &socketConnectionThread, info, thread ) )
00255         delete info;
00256       pAcceptor->addThread( socket, thread );
00257     }
00258   }
00259 
00260   if( !pAcceptor->isStopped() )
00261     pAcceptor->removeThread( s );
00262 
00263   return 0;
00264 
00265   QF_STACK_POP
00266   QF_STACK_CATCH
00267 }

THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread ( void *  p  )  [static, private]

Definition at line 269 of file ThreadedSocketAcceptor.cpp.

References FIX::ThreadedSocketConnection::getSocket(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, and FIX::ThreadedSocketConnection::read().

Referenced by socketAcceptorThread().

00270 { QF_STACK_TRY
00271   QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread)
00272 
00273   ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
00274 
00275   ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
00276   ThreadedSocketConnection* pConnection = info->m_pConnection;
00277   delete info;
00278 
00279   int socket = pConnection->getSocket();
00280 
00281   while ( pConnection->read() ) {}
00282   delete pConnection;
00283   if( !pAcceptor->isStopped() )
00284     pAcceptor->removeThread( socket );
00285   return 0;
00286 
00287   QF_STACK_POP
00288   QF_STACK_CATCH
00289 }


Friends And Related Function Documentation

friend class SocketConnection [friend]

Definition at line 38 of file ThreadedSocketAcceptor.h.


Member Data Documentation

Definition at line 93 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStart(), onStop(), and removeThread().

Definition at line 90 of file ThreadedSocketAcceptor.h.

Definition at line 89 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

Definition at line 91 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

Definition at line 92 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Mon Apr 5 21:00:12 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001