![]() |
Threaded Socket implementation of Acceptor. More...
#include <ThreadedSocketAcceptor.h>
Classes | |
struct | AcceptorThreadInfo |
struct | ConnectionThreadInfo |
Public Member Functions | |
ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError ) | |
ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError ) | |
virtual | ~ThreadedSocketAcceptor () |
Private Types | |
typedef std::set< int > | Sockets |
typedef std::set< SessionID > | Sessions |
typedef std::map< int, Sessions > | PortToSessions |
typedef std::map< int, int > | SocketToPort |
typedef std::map< int, thread_id > | SocketToThread |
Private Member Functions | |
bool | readSettings (const SessionSettings &) |
void | onConfigure (const SessionSettings &) throw ( ConfigError ) |
Implemented to configure acceptor. | |
void | onInitialize (const SessionSettings &) throw ( RuntimeError ) |
Implemented to initialize acceptor. | |
void | onStart () |
Implemented to start listening for connections. | |
bool | onPoll (double timeout) |
Implemented to connect and poll for events. | |
void | onStop () |
Implemented to stop a running acceptor. | |
void | addThread (int s, thread_id t) |
void | removeThread (int s) |
Static Private Member Functions | |
static THREAD_PROC | socketAcceptorThread (void *p) |
static THREAD_PROC | socketConnectionThread (void *p) |
Private Attributes | |
Sockets | m_sockets |
PortToSessions | m_portToSessions |
SocketToPort | m_socketToPort |
SocketToThread | m_threads |
Mutex | m_mutex |
Friends | |
class | SocketConnection |
Threaded Socket implementation of Acceptor.
Definition at line 36 of file ThreadedSocketAcceptor.h.
typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions [private] |
Definition at line 73 of file ThreadedSocketAcceptor.h.
typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions [private] |
Reimplemented from FIX::Acceptor.
Definition at line 72 of file ThreadedSocketAcceptor.h.
typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets [private] |
Definition at line 71 of file ThreadedSocketAcceptor.h.
typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort [private] |
Definition at line 74 of file ThreadedSocketAcceptor.h.
typedef std::map< int, thread_id > FIX::ThreadedSocketAcceptor::SocketToThread [private] |
Definition at line 75 of file ThreadedSocketAcceptor.h.
FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor | ( | Application & | application, | |
MessageStoreFactory & | factory, | |||
const SessionSettings & | settings | |||
) | throw ( ConfigError ) |
Definition at line 33 of file ThreadedSocketAcceptor.cpp.
References FIX::socket_init().
00037 : Acceptor( application, factory, settings ) 00038 { socket_init(); }
FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor | ( | Application & | application, | |
MessageStoreFactory & | factory, | |||
const SessionSettings & | settings, | |||
LogFactory & | logFactory | |||
) | throw ( ConfigError ) |
Definition at line 40 of file ThreadedSocketAcceptor.cpp.
References FIX::socket_init().
00045 : Acceptor( application, factory, settings, logFactory ) 00046 { 00047 socket_init(); 00048 }
FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor | ( | ) | [virtual] |
Definition at line 50 of file ThreadedSocketAcceptor.cpp.
References FIX::socket_term().
00051 { 00052 socket_term(); 00053 }
void FIX::ThreadedSocketAcceptor::addThread | ( | int | s, | |
thread_id | t | |||
) | [private] |
Definition at line 184 of file ThreadedSocketAcceptor.cpp.
References m_mutex, m_threads, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by onStart().
00185 { QF_STACK_PUSH(ThreadedSocketAcceptor::addThread) 00186 00187 Locker l(m_mutex); 00188 00189 m_threads[ s ] = t; 00190 00191 QF_STACK_POP 00192 }
void FIX::ThreadedSocketAcceptor::onConfigure | ( | const SessionSettings & | ) | throw ( ConfigError ) [private, virtual] |
Implemented to configure acceptor.
Reimplemented from FIX::Acceptor.
Definition at line 55 of file ThreadedSocketAcceptor.cpp.
References FIX::Dictionary::getBool(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.
00057 { QF_STACK_PUSH(ThreadedSocketAcceptor::onConfigure) 00058 00059 std::set<SessionID> sessions = s.getSessions(); 00060 std::set<SessionID>::iterator i; 00061 for( i = sessions.begin(); i != sessions.end(); ++i ) 00062 { 00063 const Dictionary& settings = s.get( *i ); 00064 settings.getLong( SOCKET_ACCEPT_PORT ); 00065 if( settings.has(SOCKET_REUSE_ADDRESS) ) 00066 settings.getBool( SOCKET_REUSE_ADDRESS ); 00067 if( settings.has(SOCKET_NODELAY) ) 00068 settings.getBool( SOCKET_NODELAY ); 00069 } 00070 00071 QF_STACK_POP 00072 }
void FIX::ThreadedSocketAcceptor::onInitialize | ( | const SessionSettings & | ) | throw ( RuntimeError ) [private, virtual] |
Implemented to initialize acceptor.
Reimplemented from FIX::Acceptor.
Definition at line 74 of file ThreadedSocketAcceptor.cpp.
References FIX::IntConvertor::convert(), FIX::Dictionary::getLong(), FIX::Dictionary::has(), QF_STACK_POP, QF_STACK_PUSH, FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, FIX::SOCKET_REUSE_ADDRESS, FIX::SOCKET_SEND_BUFFER_SIZE, and FIX::socket_setsockopt().
00076 { QF_STACK_PUSH(ThreadedSocketAcceptor::onInitialize) 00077 00078 short port = 0; 00079 std::set<int> ports; 00080 00081 std::set<SessionID> sessions = s.getSessions(); 00082 std::set<SessionID>::iterator i = sessions.begin(); 00083 for( ; i != sessions.end(); ++i ) 00084 { 00085 Dictionary settings = s.get( *i ); 00086 port = (short)settings.getLong( SOCKET_ACCEPT_PORT ); 00087 00088 m_portToSessions[port].insert( *i ); 00089 00090 if( ports.find(port) != ports.end() ) 00091 continue; 00092 ports.insert( port ); 00093 00094 const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ? 00095 s.get().getBool( SOCKET_REUSE_ADDRESS ) : true; 00096 00097 const bool noDelay = settings.has( SOCKET_NODELAY ) ? 00098 s.get().getBool( SOCKET_NODELAY ) : false; 00099 00100 const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ? 00101 s.get().getLong( SOCKET_SEND_BUFFER_SIZE ) : 0; 00102 00103 const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ? 00104 s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE ) : 0; 00105 00106 int socket = socket_createAcceptor( port, reuseAddress ); 00107 if( socket < 0 ) 00108 { 00109 SocketException e; 00110 socket_close( socket ); 00111 throw RuntimeError( "Unable to create, bind, or listen to port " 00112 + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" ); 00113 } 00114 if( noDelay ) 00115 socket_setsockopt( socket, TCP_NODELAY ); 00116 if( sendBufSize ) 00117 socket_setsockopt( socket, SO_SNDBUF, sendBufSize ); 00118 if( rcvBufSize ) 00119 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize ); 00120 00121 m_socketToPort[socket] = port; 00122 m_sockets.insert( socket ); 00123 } 00124 00125 QF_STACK_POP 00126 }
bool FIX::ThreadedSocketAcceptor::onPoll | ( | double | second | ) | [private, virtual] |
Implemented to connect and poll for events.
Implements FIX::Acceptor.
Definition at line 145 of file ThreadedSocketAcceptor.cpp.
References QF_STACK_POP, and QF_STACK_PUSH.
00146 { QF_STACK_PUSH(ThreadedSocketAcceptor::onPoll) 00147 00148 return false; 00149 00150 QF_STACK_POP 00151 }
void FIX::ThreadedSocketAcceptor::onStart | ( | ) | [private, virtual] |
Implemented to start listening for connections.
Implements FIX::Acceptor.
Definition at line 128 of file ThreadedSocketAcceptor.cpp.
References addThread(), m_mutex, m_sockets, m_socketToPort, QF_STACK_POP, QF_STACK_PUSH, socketAcceptorThread(), and FIX::thread_spawn().
00129 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStart) 00130 00131 Sockets::iterator i; 00132 for( i = m_sockets.begin(); i != m_sockets.end(); ++i ) 00133 { 00134 Locker l( m_mutex ); 00135 int port = m_socketToPort[*i]; 00136 AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port ); 00137 thread_id thread; 00138 thread_spawn( &socketAcceptorThread, info, thread ); 00139 addThread( *i, thread ); 00140 } 00141 00142 QF_STACK_POP 00143 }
void FIX::ThreadedSocketAcceptor::onStop | ( | ) | [private, virtual] |
Implemented to stop a running acceptor.
Implements FIX::Acceptor.
Definition at line 153 of file ThreadedSocketAcceptor.cpp.
References FIX::Acceptor::isLoggedOn(), m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, FIX::socket_close(), FIX::Acceptor::start(), and FIX::thread_join().
00154 { QF_STACK_PUSH(ThreadedSocketAcceptor::onStop) 00155 00156 SocketToThread threads; 00157 SocketToThread::iterator i; 00158 00159 { 00160 Locker l(m_mutex); 00161 00162 time_t start = 0; 00163 time_t now = 0; 00164 00165 ::time( &start ); 00166 while ( isLoggedOn() ) 00167 { 00168 if( ::time(&now) -5 >= start ) 00169 break; 00170 } 00171 00172 threads = m_threads; 00173 m_threads.clear(); 00174 } 00175 00176 for ( i = threads.begin(); i != threads.end(); ++i ) 00177 socket_close( i->first ); 00178 for ( i = threads.begin(); i != threads.end(); ++i ) 00179 thread_join( i->second ); 00180 00181 QF_STACK_POP 00182 }
bool FIX::ThreadedSocketAcceptor::readSettings | ( | const SessionSettings & | ) | [private] |
void FIX::ThreadedSocketAcceptor::removeThread | ( | int | s | ) | [private] |
Definition at line 194 of file ThreadedSocketAcceptor.cpp.
References m_mutex, m_threads, QF_STACK_POP, QF_STACK_PUSH, and FIX::thread_detach().
00195 { QF_STACK_PUSH(ThreadedSocketAcceptor::removeThread) 00196 00197 Locker l(m_mutex); 00198 SocketToThread::iterator i = m_threads.find( s ); 00199 if ( i != m_threads.end() ) 00200 { 00201 thread_detach( i->second ); 00202 m_threads.erase( i ); 00203 } 00204 00205 QF_STACK_POP 00206 }
THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread | ( | void * | p | ) | [static, private] |
Definition at line 208 of file ThreadedSocketAcceptor.cpp.
References QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), and FIX::thread_spawn().
Referenced by onStart().
00209 { QF_STACK_TRY 00210 QF_STACK_PUSH(ThreadedSocketAcceptor::socketAcceptorThread) 00211 00212 AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p ); 00213 00214 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor; 00215 int s = info->m_socket; 00216 int port = info->m_port; 00217 delete info; 00218 00219 int noDelay = 0; 00220 int sendBufSize = 0; 00221 int rcvBufSize = 0; 00222 socket_getsockopt( s, TCP_NODELAY, noDelay ); 00223 socket_getsockopt( s, SO_SNDBUF, sendBufSize ); 00224 socket_getsockopt( s, SO_RCVBUF, rcvBufSize ); 00225 00226 int socket = 0; 00227 while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) ) 00228 { 00229 if( noDelay ) 00230 socket_setsockopt( socket, TCP_NODELAY ); 00231 if( sendBufSize ) 00232 socket_setsockopt( socket, SO_SNDBUF, sendBufSize ); 00233 if( rcvBufSize ) 00234 socket_setsockopt( socket, SO_RCVBUF, rcvBufSize ); 00235 00236 Sessions sessions = pAcceptor->m_portToSessions[port]; 00237 00238 ThreadedSocketConnection * pConnection = 00239 new ThreadedSocketConnection 00240 ( socket, sessions, pAcceptor->getApplication(), pAcceptor->getLog() ); 00241 00242 ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection ); 00243 00244 { 00245 Locker l( pAcceptor->m_mutex ); 00246 00247 std::stringstream stream; 00248 stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port; 00249 00250 if( pAcceptor->getLog() ) 00251 pAcceptor->getLog()->onEvent( stream.str() ); 00252 00253 thread_id thread; 00254 if ( !thread_spawn( &socketConnectionThread, info, thread ) ) 00255 delete info; 00256 pAcceptor->addThread( socket, thread ); 00257 } 00258 } 00259 00260 if( !pAcceptor->isStopped() ) 00261 pAcceptor->removeThread( s ); 00262 00263 return 0; 00264 00265 QF_STACK_POP 00266 QF_STACK_CATCH 00267 }
THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread | ( | void * | p | ) | [static, private] |
Definition at line 269 of file ThreadedSocketAcceptor.cpp.
References FIX::ThreadedSocketConnection::getSocket(), QF_STACK_CATCH, QF_STACK_POP, QF_STACK_PUSH, QF_STACK_TRY, and FIX::ThreadedSocketConnection::read().
Referenced by socketAcceptorThread().
00270 { QF_STACK_TRY 00271 QF_STACK_PUSH(ThreadedSocketAcceptor::socketConnectionThread) 00272 00273 ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p ); 00274 00275 ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor; 00276 ThreadedSocketConnection* pConnection = info->m_pConnection; 00277 delete info; 00278 00279 int socket = pConnection->getSocket(); 00280 00281 while ( pConnection->read() ) {} 00282 delete pConnection; 00283 if( !pAcceptor->isStopped() ) 00284 pAcceptor->removeThread( socket ); 00285 return 0; 00286 00287 QF_STACK_POP 00288 QF_STACK_CATCH 00289 }
friend class SocketConnection [friend] |
Definition at line 38 of file ThreadedSocketAcceptor.h.
Mutex FIX::ThreadedSocketAcceptor::m_mutex [private] |
Definition at line 93 of file ThreadedSocketAcceptor.h.
Referenced by addThread(), onStart(), onStop(), and removeThread().
Definition at line 90 of file ThreadedSocketAcceptor.h.
Definition at line 89 of file ThreadedSocketAcceptor.h.
Referenced by onStart().
Definition at line 91 of file ThreadedSocketAcceptor.h.
Referenced by onStart().
Definition at line 92 of file ThreadedSocketAcceptor.h.
Referenced by addThread(), onStop(), and removeThread().