Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketInitiator Class Reference

Threaded Socket implementation of Initiator. More...

#include <ThreadedSocketInitiator.h>

Inheritance diagram for FIX::ThreadedSocketInitiator:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketInitiator:
Collaboration graph
[legend]

Public Member Functions

 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketInitiator ()
 
- Public Member Functions inherited from FIX::Initiator
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Initiator ()
 
void start () throw ( ConfigError, RuntimeError )
 Start initiator. More...
 
void block () throw ( ConfigError, RuntimeError )
 Block on the initiator. More...
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the initiator. More...
 
void stop (bool force=false)
 Stop initiator. More...
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on. More...
 
SessiongetSession (const SessionID &sessionID, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 
LoggetLog ()
 

Private Types

typedef std::map< int, thread_idSocketToThread
 
typedef std::map< SessionID, int > SessionToHostNum
 
typedef std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
 

Private Member Functions

void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor. More...
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize initiator. More...
 
void onStart ()
 Implemented to start connecting to targets. More...
 
bool onPoll (double timeout)
 Implemented to connect and poll for events. More...
 
void onStop ()
 Implemented to stop a running initiator. More...
 
void doConnect (const SessionID &s, const Dictionary &d)
 Implemented to connect a session to its target. More...
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 
void lock ()
 
void getHost (const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
 

Static Private Member Functions

static THREAD_PROC socketThread (void *p)
 

Private Attributes

SessionSettings m_settings
 
SessionToHostNum m_sessionToHostNum
 
time_t m_lastConnect
 
int m_reconnectInterval
 
bool m_noDelay
 
int m_sendBufSize
 
int m_rcvBufSize
 
SocketToThread m_threads
 
Mutex m_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from FIX::Initiator
void setPending (const SessionID &)
 
void setConnected (const SessionID &)
 
void setDisconnected (const SessionID &)
 
bool isPending (const SessionID &)
 
bool isConnected (const SessionID &)
 
bool isDisconnected (const SessionID &)
 
void connect ()
 
- Protected Attributes inherited from FIX::Initiator
SessionSettings m_settings
 

Detailed Description

Threaded Socket implementation of Initiator.

Definition at line 39 of file ThreadedSocketInitiator.h.

Member Typedef Documentation

◆ SessionToHostNum

Definition at line 52 of file ThreadedSocketInitiator.h.

◆ SocketToThread

typedef std::map< int, thread_id > FIX::ThreadedSocketInitiator::SocketToThread
private

Definition at line 51 of file ThreadedSocketInitiator.h.

◆ ThreadPair

Definition at line 53 of file ThreadedSocketInitiator.h.

Constructor & Destructor Documentation

◆ ThreadedSocketInitiator() [1/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 32 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

36 : Initiator( application, factory, settings ),
37  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39 {
40  socket_init();
41 }
void socket_init()
Definition: Utility.cpp:81
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Initiator.cpp:36

◆ ThreadedSocketInitiator() [2/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 43 of file ThreadedSocketInitiator.cpp.

References FIX::socket_init().

48 : Initiator( application, factory, settings, logFactory ),
49  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51 {
52  socket_init();
53 }
void socket_init()
Definition: Utility.cpp:81
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Initiator.cpp:36

◆ ~ThreadedSocketInitiator()

FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator ( )
virtual

Definition at line 55 of file ThreadedSocketInitiator.cpp.

References FIX::socket_term().

56 {
57  socket_term();
58 }
void socket_term()
Definition: Utility.cpp:96

Member Function Documentation

◆ addThread()

void FIX::ThreadedSocketInitiator::addThread ( int  s,
thread_id  t 
)
private

Definition at line 183 of file ThreadedSocketInitiator.cpp.

References m_mutex, and m_threads.

Referenced by doConnect().

184 {
185  Locker l(m_mutex);
186 
187  m_threads[ s ] = t;
188 }

◆ doConnect()

void FIX::ThreadedSocketInitiator::doConnect ( const SessionID ,
const Dictionary  
)
privatevirtual

Implemented to connect a session to its target.

Implements FIX::Initiator.

Definition at line 133 of file ThreadedSocketInitiator.cpp.

References addThread(), FIX::IntConvertor::convert(), FIX::ThreadedSocketConnection::disconnect(), getHost(), FIX::Initiator::getLog(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Session::lookupSession(), m_mutex, m_noDelay, m_rcvBufSize, m_sendBufSize, FIX::Log::onEvent(), FIX::Initiator::setDisconnected(), FIX::Initiator::setPending(), FIX::socket_createConnector(), FIX::socket_setsockopt(), socketThread(), and FIX::thread_spawn().

134 {
135  try
136  {
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  std::string address;
143  short port = 0;
144  std::string sourceAddress;
145  short sourcePort = 0;
146  getHost( s, d, address, port, sourceAddress, sourcePort );
147 
148  int socket = socket_createConnector();
149  if( m_noDelay )
150  socket_setsockopt( socket, TCP_NODELAY );
151  if( m_sendBufSize )
152  socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
153  if( m_rcvBufSize )
154  socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
155 
156  setPending( s );
157  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
158 
159  ThreadedSocketConnection* pConnection =
160  new ThreadedSocketConnection( s, socket, address, port, getLog(), sourceAddress, sourcePort );
161 
162  ThreadPair* pair = new ThreadPair( this, pConnection );
163 
164  {
165  Locker l( m_mutex );
166  thread_id thread;
167  if ( thread_spawn( &socketThread, pair, thread ) )
168  {
169  addThread( socket, thread );
170  }
171  else
172  {
173  delete pair;
174  pConnection->disconnect();
175  delete pConnection;
176  setDisconnected( s );
177  }
178  }
179  }
180  catch ( std::exception& ) {}
181 }
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1496
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
int socket_createConnector()
Definition: Utility.cpp:143
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:416
static std::string convert(signed_int value)
static THREAD_PROC socketThread(void *p)
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:208
Log * getLog()
Definition: Initiator.h:90
pthread_t thread_id
Definition: Utility.h:190
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163

◆ getHost()

void FIX::ThreadedSocketInitiator::getHost ( const SessionID s,
const Dictionary d,
std::string &  address,
short &  port,
std::string &  sourceAddress,
short &  sourcePort 
)
private

Definition at line 240 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getInt(), FIX::Dictionary::getString(), FIX::Dictionary::has(), m_sessionToHostNum, FIX::SOCKET_CONNECT_HOST, FIX::SOCKET_CONNECT_PORT, FIX::SOCKET_CONNECT_SOURCE_HOST, and FIX::SOCKET_CONNECT_SOURCE_PORT.

Referenced by doConnect(), and lock().

243 {
244  int num = 0;
245  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
246  if ( i != m_sessionToHostNum.end() ) num = i->second;
247 
248  std::stringstream hostStream;
249  hostStream << SOCKET_CONNECT_HOST << num;
250  std::string hostString = hostStream.str();
251 
252  std::stringstream portStream;
253  portStream << SOCKET_CONNECT_PORT << num;
254  std::string portString = portStream.str();
255 
256  if( d.has(hostString) && d.has(portString) )
257  {
258  address = d.getString( hostString );
259  port = ( short ) d.getInt( portString );
260 
261  std::stringstream sourceHostStream;
262  sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
263  hostString = sourceHostStream.str();
264  if( d.has(hostString) )
265  sourceAddress = d.getString( hostString );
266 
267  std::stringstream sourcePortStream;
268  sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
269  portString = sourcePortStream.str();
270  if( d.has(portString) )
271  sourcePort = ( short ) d.getInt( portString );
272  }
273  else
274  {
275  num = 0;
276  address = d.getString( SOCKET_CONNECT_HOST );
277  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
278 
279  if( d.has(SOCKET_CONNECT_SOURCE_HOST) )
280  sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
281  if( d.has(SOCKET_CONNECT_SOURCE_PORT) )
282  sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
283  }
284 
285  m_sessionToHostNum[ s ] = ++num;
286 }
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_PORT[]
const char SOCKET_CONNECT_SOURCE_PORT[]
const char SOCKET_CONNECT_SOURCE_HOST[]

◆ lock()

void FIX::ThreadedSocketInitiator::lock ( )
inlineprivate

Definition at line 66 of file ThreadedSocketInitiator.h.

References getHost(), m_mutex, socketThread(), and THREAD_PROC.

◆ onConfigure()

void FIX::ThreadedSocketInitiator::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Initiator.

Definition at line 60 of file ThreadedSocketInitiator.cpp.

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), m_noDelay, m_rcvBufSize, m_reconnectInterval, m_sendBufSize, FIX::RECONNECT_INTERVAL, FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, and FIX::SOCKET_SEND_BUFFER_SIZE.

62 {
63  const Dictionary& dict = s.get();
64 
65  if( dict.has( RECONNECT_INTERVAL ) )
66  m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
67  if( dict.has( SOCKET_NODELAY ) )
68  m_noDelay = dict.getBool( SOCKET_NODELAY );
69  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
70  m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
71  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
73 }
const char RECONNECT_INTERVAL[]
const char SOCKET_NODELAY[]
const char SOCKET_SEND_BUFFER_SIZE[]
const char SOCKET_RECEIVE_BUFFER_SIZE[]

◆ onInitialize()

void FIX::ThreadedSocketInitiator::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize initiator.

Reimplemented from FIX::Initiator.

Definition at line 75 of file ThreadedSocketInitiator.cpp.

77 {
78 }

◆ onPoll()

bool FIX::ThreadedSocketInitiator::onPoll ( double  timeout)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Initiator.

Definition at line 98 of file ThreadedSocketInitiator.cpp.

99 {
100  return false;
101 }

◆ onStart()

void FIX::ThreadedSocketInitiator::onStart ( )
privatevirtual

Implemented to start connecting to targets.

Implements FIX::Initiator.

Definition at line 80 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::connect(), FIX::Initiator::isStopped(), m_lastConnect, m_mutex, m_reconnectInterval, and FIX::process_sleep().

81 {
82  while ( !isStopped() )
83  {
84  time_t now;
85  ::time( &now );
86 
87  if ( (now - m_lastConnect) >= m_reconnectInterval )
88  {
89  Locker l( m_mutex );
90  connect();
91  m_lastConnect = now;
92  }
93 
94  process_sleep( 1 );
95  }
96 }
void process_sleep(double s)
Definition: Utility.cpp:466
bool isStopped()
Definition: Initiator.h:83

◆ onStop()

void FIX::ThreadedSocketInitiator::onStop ( )
privatevirtual

Implemented to stop a running initiator.

Implements FIX::Initiator.

Definition at line 103 of file ThreadedSocketInitiator.cpp.

References FIX::Initiator::isLoggedOn(), m_mutex, m_threads, FIX::socket_close(), FIX::Initiator::start(), and FIX::thread_join().

104 {
105  SocketToThread threads;
106  SocketToThread::iterator i;
107 
108  {
109  Locker l(m_mutex);
110 
111  time_t start = 0;
112  time_t now = 0;
113 
114  ::time( &start );
115  while ( isLoggedOn() )
116  {
117  if( ::time(&now) -5 >= start )
118  break;
119  }
120 
121  threads = m_threads;
122  m_threads.clear();
123  }
124 
125  for ( i = threads.begin(); i != threads.end(); ++i )
126  socket_close( i->first );
127 
128  for ( i = threads.begin(); i != threads.end(); ++i )
129  thread_join( i->second );
130  threads.clear();
131 }
void thread_join(thread_id thread)
Definition: Utility.cpp:437
std::map< int, thread_id > SocketToThread
void start()
Start initiator.
Definition: Initiator.cpp:190
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:275
void socket_close(int s)
Definition: Utility.cpp:180

◆ removeThread()

void FIX::ThreadedSocketInitiator::removeThread ( int  s)
private

Definition at line 190 of file ThreadedSocketInitiator.cpp.

References m_mutex, m_threads, and FIX::thread_detach().

191 {
192  Locker l(m_mutex);
193  SocketToThread::iterator i = m_threads.find( s );
194 
195  if ( i != m_threads.end() )
196  {
197  thread_detach( i->second );
198  m_threads.erase( i );
199  }
200 }
void thread_detach(thread_id thread)
Definition: Utility.cpp:447

◆ socketThread()

THREAD_PROC FIX::ThreadedSocketInitiator::socketThread ( void *  p)
staticprivate

Definition at line 202 of file ThreadedSocketInitiator.cpp.

References FIX::ThreadedSocketConnection::connect(), FIX::ThreadedSocketConnection::disconnect(), FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Session::lookupSession(), FIX::Session::next(), and FIX::ThreadedSocketConnection::read().

Referenced by doConnect(), and lock().

203 {
204  ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
205 
206  ThreadedSocketInitiator* pInitiator = pair->first;
207  ThreadedSocketConnection* pConnection = pair->second;
208  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
209  FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
210  int socket = pConnection->getSocket();
211  delete pair;
212 
213  pInitiator->lock();
214 
215  if( !pConnection->connect() )
216  {
217  pInitiator->getLog()->onEvent( "Connection failed" );
218  pConnection->disconnect();
219  delete pConnection;
220  pInitiator->removeThread( socket );
221  pInitiator->setDisconnected( sessionID );
222  return 0;
223  }
224 
225  pInitiator->setConnected( sessionID );
226  pInitiator->getLog()->onEvent( "Connection succeeded" );
227 
228  pSession->next();
229 
230  while ( pConnection->read() ) {}
231 
232  delete pConnection;
233  if( !pInitiator->isStopped() )
234  pInitiator->removeThread( socket );
235 
236  pInitiator->setDisconnected( sessionID );
237  return 0;
238 }
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1496
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
void next()
Definition: Session.cpp:125
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30

Member Data Documentation

◆ m_lastConnect

time_t FIX::ThreadedSocketInitiator::m_lastConnect
private

Definition at line 73 of file ThreadedSocketInitiator.h.

Referenced by onStart().

◆ m_mutex

Mutex FIX::ThreadedSocketInitiator::m_mutex
private

Definition at line 79 of file ThreadedSocketInitiator.h.

Referenced by addThread(), doConnect(), lock(), onStart(), onStop(), and removeThread().

◆ m_noDelay

bool FIX::ThreadedSocketInitiator::m_noDelay
private

Definition at line 75 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_rcvBufSize

int FIX::ThreadedSocketInitiator::m_rcvBufSize
private

Definition at line 77 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_reconnectInterval

int FIX::ThreadedSocketInitiator::m_reconnectInterval
private

Definition at line 74 of file ThreadedSocketInitiator.h.

Referenced by onConfigure(), and onStart().

◆ m_sendBufSize

int FIX::ThreadedSocketInitiator::m_sendBufSize
private

Definition at line 76 of file ThreadedSocketInitiator.h.

Referenced by doConnect(), and onConfigure().

◆ m_sessionToHostNum

SessionToHostNum FIX::ThreadedSocketInitiator::m_sessionToHostNum
private

Definition at line 72 of file ThreadedSocketInitiator.h.

Referenced by getHost().

◆ m_settings

SessionSettings FIX::ThreadedSocketInitiator::m_settings
private

Definition at line 71 of file ThreadedSocketInitiator.h.

◆ m_threads

SocketToThread FIX::ThreadedSocketInitiator::m_threads
private

Definition at line 78 of file ThreadedSocketInitiator.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Wed Aug 28 2019 14:13:46 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001