Classes | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends | List of all members
FIX::ThreadedSocketAcceptor Class Reference

Threaded Socket implementation of Acceptor. More...

#include <ThreadedSocketAcceptor.h>

Inheritance diagram for FIX::ThreadedSocketAcceptor:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketAcceptor:
Collaboration graph
[legend]

Classes

struct  AcceptorThreadInfo
 
struct  ConnectionThreadInfo
 

Public Member Functions

 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketAcceptor ()
 
- Public Member Functions inherited from FIX::Acceptor
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Acceptor ()
 
LoggetLog ()
 
void start () throw ( ConfigError, RuntimeError )
 Start acceptor. More...
 
void block () throw ( ConfigError, RuntimeError )
 Block on the acceptor. More...
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the acceptor. More...
 
void stop (bool force=false)
 Stop acceptor. More...
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on. More...
 
SessiongetSession (const std::string &msg, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 

Private Types

typedef std::set< int > Sockets
 
typedef std::set< SessionIDSessions
 
typedef std::map< int, SessionsPortToSessions
 
typedef std::map< int, int > SocketToPort
 
typedef std::map< int, thread_idSocketToThread
 

Private Member Functions

bool readSettings (const SessionSettings &)
 
void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor. More...
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize acceptor. More...
 
void onStart ()
 Implemented to start listening for connections. More...
 
bool onPoll (double timeout)
 Implemented to connect and poll for events. More...
 
void onStop ()
 Implemented to stop a running acceptor. More...
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 

Static Private Member Functions

static THREAD_PROC socketAcceptorThread (void *p)
 
static THREAD_PROC socketConnectionThread (void *p)
 

Private Attributes

Sockets m_sockets
 
PortToSessions m_portToSessions
 
SocketToPort m_socketToPort
 
SocketToThread m_threads
 
Mutex m_mutex
 

Friends

class SocketConnection
 

Additional Inherited Members

- Protected Attributes inherited from FIX::Acceptor
SessionSettings m_settings
 

Detailed Description

Threaded Socket implementation of Acceptor.

Definition at line 53 of file ThreadedSocketAcceptor.h.

Member Typedef Documentation

◆ PortToSessions

typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions
private

Definition at line 107 of file ThreadedSocketAcceptor.h.

◆ Sessions

typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions
private

Definition at line 106 of file ThreadedSocketAcceptor.h.

◆ Sockets

typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets
private

Definition at line 105 of file ThreadedSocketAcceptor.h.

◆ SocketToPort

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort
private

Definition at line 108 of file ThreadedSocketAcceptor.h.

◆ SocketToThread

typedef std::map< int, thread_id > FIX::ThreadedSocketAcceptor::SocketToThread
private

Definition at line 109 of file ThreadedSocketAcceptor.h.

Constructor & Destructor Documentation

◆ ThreadedSocketAcceptor() [1/2]

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 49 of file ThreadedSocketAcceptor.cpp.

50 {
51  socket_term();
52 }
53 
54 void ThreadedSocketAcceptor::onConfigure( const SessionSettings& s )

References FIX::socket_term().

◆ ThreadedSocketAcceptor() [2/2]

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 56 of file ThreadedSocketAcceptor.cpp.

56 {
57  std::set<SessionID> sessions = s.getSessions();
58  std::set<SessionID>::iterator i;
59  for( i = sessions.begin(); i != sessions.end(); ++i )
60  {
61  const Dictionary& settings = s.get( *i );
62  settings.getInt( SOCKET_ACCEPT_PORT );
63  if( settings.has(SOCKET_REUSE_ADDRESS) )
64  settings.getBool( SOCKET_REUSE_ADDRESS );

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.

◆ ~ThreadedSocketAcceptor()

FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor ( )
virtual

Definition at line 66 of file ThreadedSocketAcceptor.cpp.

Member Function Documentation

◆ addThread()

void FIX::ThreadedSocketAcceptor::addThread ( int  s,
thread_id  t 
)
private

Definition at line 185 of file ThreadedSocketAcceptor.cpp.

187 {
188  AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189 
190  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;

References FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_pAcceptor, and FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_socket.

Referenced by socketAcceptorThread().

◆ onConfigure()

void FIX::ThreadedSocketAcceptor::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 71 of file ThreadedSocketAcceptor.cpp.

72 {
73  short port = 0;
74  std::set<int> ports;
75 
76  std::set<SessionID> sessions = s.getSessions();
77  std::set<SessionID>::iterator i = sessions.begin();
78  for( ; i != sessions.end(); ++i )
79  {
80  const Dictionary& settings = s.get( *i );
81  port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82 
83  m_portToSessions[port].insert( *i );
84 
85  if( ports.find(port) != ports.end() )

References FIX::Dictionary::getInt(), and FIX::SOCKET_ACCEPT_PORT.

◆ onInitialize()

void FIX::ThreadedSocketAcceptor::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 87 of file ThreadedSocketAcceptor.cpp.

90  : true;
91 
92  const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93  settings.getBool( SOCKET_NODELAY ) : false;
94 
95  const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96  settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97 
98  const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99  settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100 
101  int socket = socket_createAcceptor( port, reuseAddress );
102  if( socket < 0 )
103  {
104  SocketException e;
105  socket_close( socket );
106  throw RuntimeError( "Unable to create, bind, or listen to port "
107  + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108  }
109  if( noDelay )
110  socket_setsockopt( socket, TCP_NODELAY );
111  if( sendBufSize )
112  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113  if( rcvBufSize )
114  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115 
116  m_socketToPort[socket] = port;
117  m_sockets.insert( socket );
118  }
119 }
120 
122 {
123  Sockets::iterator i;
124  for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125  {
126  Locker l( m_mutex );
127  int port = m_socketToPort[*i];
128  AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129  thread_id thread;
130  thread_spawn( &socketAcceptorThread, info, thread );
131  addThread( *i, thread );
132  }
133 }
134 
135 bool ThreadedSocketAcceptor::onPoll( double timeout )
136 {

◆ onPoll()

bool FIX::ThreadedSocketAcceptor::onPoll ( double  second)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Acceptor.

Definition at line 152 of file ThreadedSocketAcceptor.cpp.

153  {
154  if( ::time(&now) -5 >= start )
155  break;

References FIX::Acceptor::start().

◆ onStart()

void FIX::ThreadedSocketAcceptor::onStart ( )
privatevirtual

Implemented to start listening for connections.

Implements FIX::Acceptor.

Definition at line 138 of file ThreadedSocketAcceptor.cpp.

141 {
142  SocketToThread threads;
143  SocketToThread::iterator i;
144 
145  {
146  Locker l(m_mutex);
147 
148  time_t start = 0;
149  time_t now = 0;
150 

◆ onStop()

void FIX::ThreadedSocketAcceptor::onStop ( )
privatevirtual

Implemented to stop a running acceptor.

Implements FIX::Acceptor.

Definition at line 157 of file ThreadedSocketAcceptor.cpp.

169 {
170  Locker l(m_mutex);
171 
172  m_threads[ s ] = t;
173 }
174 
176 {
177  Locker l(m_mutex);
178  SocketToThread::iterator i = m_threads.find( s );
179  if ( i != m_threads.end() )
180  {
181  thread_detach( i->second );
182  m_threads.erase( i );
183  }

◆ readSettings()

bool FIX::ThreadedSocketAcceptor::readSettings ( const SessionSettings )
private

◆ removeThread()

void FIX::ThreadedSocketAcceptor::removeThread ( int  s)
private

Definition at line 192 of file ThreadedSocketAcceptor.cpp.

204  {

◆ socketAcceptorThread()

THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread ( void *  p)
staticprivate

Definition at line 203 of file ThreadedSocketAcceptor.cpp.

204  {
205  if( noDelay )
206  socket_setsockopt( socket, TCP_NODELAY );
207  if( sendBufSize )
208  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209  if( rcvBufSize )
210  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211 
212  Sessions sessions = pAcceptor->m_portToSessions[port];
213 
214  ThreadedSocketConnection * pConnection =
215  new ThreadedSocketConnection
216  ( socket, sessions, pAcceptor->getLog() );
217 
218  ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219 
220  {
221  Locker l( pAcceptor->m_mutex );
222 
223  std::stringstream stream;
224  stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225 
226  if( pAcceptor->getLog() )
227  pAcceptor->getLog()->onEvent( stream.str() );
228 
229  thread_id thread;
230  if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231  {
232  delete info;
233  delete pConnection;
234  }
235  else
236  pAcceptor->addThread( socket, thread );
237  }
238  }
239 
240  if( !pAcceptor->isStopped() )
241  pAcceptor->removeThread( s );
242 
243  return 0;
244 }
245 
247 {
248  ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
249 
250  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
251  ThreadedSocketConnection* pConnection = info->m_pConnection;
252  delete info;
253 
254  int socket = pConnection->getSocket();
255 
256  while ( pConnection->read() ) {}
257  delete pConnection;
258  if( !pAcceptor->isStopped() )
259  pAcceptor->removeThread( socket );
260  return 0;
261 }

References addThread(), FIX::Acceptor::getLog(), m_mutex, m_portToSessions, FIX::Log::onEvent(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), and FIX::thread_spawn().

◆ socketConnectionThread()

THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread ( void *  p)
staticprivate

Definition at line 263 of file ThreadedSocketAcceptor.cpp.

Referenced by socketAcceptorThread().

Friends And Related Function Documentation

◆ SocketConnection

friend class SocketConnection
friend

Definition at line 72 of file ThreadedSocketAcceptor.h.

Member Data Documentation

◆ m_mutex

Mutex FIX::ThreadedSocketAcceptor::m_mutex
private

Definition at line 127 of file ThreadedSocketAcceptor.h.

Referenced by socketAcceptorThread().

◆ m_portToSessions

PortToSessions FIX::ThreadedSocketAcceptor::m_portToSessions
private

Definition at line 124 of file ThreadedSocketAcceptor.h.

Referenced by socketAcceptorThread().

◆ m_sockets

Sockets FIX::ThreadedSocketAcceptor::m_sockets
private

Definition at line 123 of file ThreadedSocketAcceptor.h.

◆ m_socketToPort

SocketToPort FIX::ThreadedSocketAcceptor::m_socketToPort
private

Definition at line 125 of file ThreadedSocketAcceptor.h.

◆ m_threads

SocketToThread FIX::ThreadedSocketAcceptor::m_threads
private

Definition at line 126 of file ThreadedSocketAcceptor.h.


The documentation for this class was generated from the following files:
FIX::ThreadedSocketAcceptor::onStart
void onStart()
Implemented to start listening for connections.
Definition: ThreadedSocketAcceptor.cpp:138
FIX::thread_id
pthread_t thread_id
Definition: Utility.h:190
FIX::SOCKET_RECEIVE_BUFFER_SIZE
const char SOCKET_RECEIVE_BUFFER_SIZE[]
Definition: SessionSettings.h:87
FIX::ThreadedSocketAcceptor::m_sockets
Sockets m_sockets
Definition: ThreadedSocketAcceptor.h:123
FIX::SOCKET_NODELAY
const char SOCKET_NODELAY[]
Definition: SessionSettings.h:85
FIX::ThreadedSocketAcceptor::removeThread
void removeThread(int s)
Definition: ThreadedSocketAcceptor.cpp:192
FIX::socket_setsockopt
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:225
FIX::ThreadedSocketAcceptor::m_socketToPort
SocketToPort m_socketToPort
Definition: ThreadedSocketAcceptor.h:125
FIX::ThreadedSocketAcceptor::onConfigure
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Definition: ThreadedSocketAcceptor.cpp:71
FIX::ThreadedSocketAcceptor::m_threads
SocketToThread m_threads
Definition: ThreadedSocketAcceptor.h:126
FIX::Acceptor::start
void start()
Start acceptor.
Definition: Acceptor.cpp:175
FIX::SOCKET_SEND_BUFFER_SIZE
const char SOCKET_SEND_BUFFER_SIZE[]
Definition: SessionSettings.h:86
FIX::ThreadedSocketAcceptor::SocketToThread
std::map< int, thread_id > SocketToThread
Definition: ThreadedSocketAcceptor.h:109
FIX::thread_detach
void thread_detach(thread_id thread)
Definition: Utility.cpp:464
FIX::socket_term
void socket_term()
Definition: Utility.cpp:113
FIX::SOCKET_ACCEPT_PORT
const char SOCKET_ACCEPT_PORT[]
Definition: SessionSettings.h:79
THREAD_PROC
#define THREAD_PROC
Definition: Utility.h:184
FIX::IntConvertor::convert
static std::string convert(signed_int value)
Definition: FieldConvertors.h:170
FIX::ThreadedSocketAcceptor::socketAcceptorThread
static THREAD_PROC socketAcceptorThread(void *p)
Definition: ThreadedSocketAcceptor.cpp:203
FIX::ThreadedSocketAcceptor::Sessions
std::set< SessionID > Sessions
Definition: ThreadedSocketAcceptor.h:106
FIX::ThreadedSocketAcceptor::socketConnectionThread
static THREAD_PROC socketConnectionThread(void *p)
Definition: ThreadedSocketAcceptor.cpp:263
FIX::ThreadedSocketAcceptor::addThread
void addThread(int s, thread_id t)
Definition: ThreadedSocketAcceptor.cpp:185
FIX::ThreadedSocketAcceptor::m_portToSessions
PortToSessions m_portToSessions
Definition: ThreadedSocketAcceptor.h:124
FIX::ThreadedSocketAcceptor::onPoll
bool onPoll(double timeout)
Implemented to connect and poll for events.
Definition: ThreadedSocketAcceptor.cpp:152
FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: ThreadedSocketAcceptor.cpp:49
FIX::socket_createAcceptor
int socket_createAcceptor(int port, bool reuse)
Definition: Utility.cpp:137
FIX::ThreadedSocketAcceptor::m_mutex
Mutex m_mutex
Definition: ThreadedSocketAcceptor.h:127
FIX::socket_close
void socket_close(int s)
Definition: Utility.cpp:197
FIX::SOCKET_REUSE_ADDRESS
const char SOCKET_REUSE_ADDRESS[]
Definition: SessionSettings.h:80
FIX::thread_spawn
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:433
FIX::socket_peername
const char * socket_peername(int socket)
Definition: Utility.cpp:370

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001