ThreadedSocketInitiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
27 #include "Session.h"
28 #include "Settings.h"
29 
30 namespace FIX
31 {
33  Application& application,
34  MessageStoreFactory& factory,
35  const SessionSettings& settings ) throw( ConfigError )
36 : Initiator( application, factory, settings ),
37  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39 {
40  socket_init();
41 }
42 
44  Application& application,
45  MessageStoreFactory& factory,
46  const SessionSettings& settings,
47  LogFactory& logFactory ) throw( ConfigError )
48 : Initiator( application, factory, settings, logFactory ),
49  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51 {
52  socket_init();
53 }
54 
56 {
57  socket_term();
58 }
59 
61 throw ( ConfigError )
62 {
63  const Dictionary& dict = s.get();
64 
65  if( dict.has( RECONNECT_INTERVAL ) )
67  if( dict.has( SOCKET_NODELAY ) )
69  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
71  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
73 }
74 
76 throw ( RuntimeError )
77 {
78 }
79 
81 {
82  while ( !isStopped() )
83  {
84  time_t now;
85  ::time( &now );
86 
87  if ( (now - m_lastConnect) >= m_reconnectInterval )
88  {
89  Locker l( m_mutex );
90  connect();
91  m_lastConnect = now;
92  }
93 
94  process_sleep( 1 );
95  }
96 }
97 
98 bool ThreadedSocketInitiator::onPoll( double timeout )
99 {
100  return false;
101 }
102 
104 {
105  SocketToThread threads;
106  SocketToThread::iterator i;
107 
108  {
109  Locker l(m_mutex);
110 
111  time_t start = 0;
112  time_t now = 0;
113 
114  ::time( &start );
115  while ( isLoggedOn() )
116  {
117  if( ::time(&now) -5 >= start )
118  break;
119  }
120 
121  threads = m_threads;
122  m_threads.clear();
123  }
124 
125  for ( i = threads.begin(); i != threads.end(); ++i )
126  socket_close( i->first );
127 
128  for ( i = threads.begin(); i != threads.end(); ++i )
129  thread_join( i->second );
130  threads.clear();
131 }
132 
134 {
135  try
136  {
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  std::string address;
143  short port = 0;
144  std::string sourceAddress;
145  short sourcePort = 0;
146  getHost( s, d, address, port, sourceAddress, sourcePort );
147 
148  int socket = socket_createConnector();
149  if( m_noDelay )
150  socket_setsockopt( socket, TCP_NODELAY );
151  if( m_sendBufSize )
152  socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
153  if( m_rcvBufSize )
154  socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
155 
156  setPending( s );
157  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
158 
159  ThreadedSocketConnection* pConnection =
160  new ThreadedSocketConnection( s, socket, address, port, getLog(), sourceAddress, sourcePort );
161 
162  ThreadPair* pair = new ThreadPair( this, pConnection );
163 
164  {
165  Locker l( m_mutex );
166  thread_id thread;
167  if ( thread_spawn( &socketThread, pair, thread ) )
168  {
169  addThread( socket, thread );
170  }
171  else
172  {
173  delete pair;
174  pConnection->disconnect();
175  delete pConnection;
176  setDisconnected( s );
177  }
178  }
179  }
180  catch ( std::exception& ) {}
181 }
182 
184 {
185  Locker l(m_mutex);
186 
187  m_threads[ s ] = t;
188 }
189 
191 {
192  Locker l(m_mutex);
193  SocketToThread::iterator i = m_threads.find( s );
194 
195  if ( i != m_threads.end() )
196  {
197  thread_detach( i->second );
198  m_threads.erase( i );
199  }
200 }
201 
203 {
204  ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
205 
206  ThreadedSocketInitiator* pInitiator = pair->first;
207  ThreadedSocketConnection* pConnection = pair->second;
208  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
209  FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
210  int socket = pConnection->getSocket();
211  delete pair;
212 
213  pInitiator->lock();
214 
215  if( !pConnection->connect() )
216  {
217  pInitiator->getLog()->onEvent( "Connection failed" );
218  pConnection->disconnect();
219  delete pConnection;
220  pInitiator->removeThread( socket );
221  pInitiator->setDisconnected( sessionID );
222  return 0;
223  }
224 
225  pInitiator->setConnected( sessionID );
226  pInitiator->getLog()->onEvent( "Connection succeeded" );
227 
228  pSession->next();
229 
230  while ( pConnection->read() ) {}
231 
232  delete pConnection;
233  if( !pInitiator->isStopped() )
234  pInitiator->removeThread( socket );
235 
236  pInitiator->setDisconnected( sessionID );
237  return 0;
238 }
239 
241  std::string& address, short& port,
242  std::string& sourceAddress, short& sourcePort )
243 {
244  int num = 0;
245  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
246  if ( i != m_sessionToHostNum.end() ) num = i->second;
247 
248  std::stringstream hostStream;
249  hostStream << SOCKET_CONNECT_HOST << num;
250  std::string hostString = hostStream.str();
251 
252  std::stringstream portStream;
253  portStream << SOCKET_CONNECT_PORT << num;
254  std::string portString = portStream.str();
255 
256  if( d.has(hostString) && d.has(portString) )
257  {
258  address = d.getString( hostString );
259  port = ( short ) d.getInt( portString );
260 
261  std::stringstream sourceHostStream;
262  sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
263  hostString = sourceHostStream.str();
264  if( d.has(hostString) )
265  sourceAddress = d.getString( hostString );
266 
267  std::stringstream sourcePortStream;
268  sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
269  portString = sourcePortStream.str();
270  if( d.has(portString) )
271  sourcePort = ( short ) d.getInt( portString );
272  }
273  else
274  {
275  num = 0;
276  address = d.getString( SOCKET_CONNECT_HOST );
277  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
278 
280  sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
282  sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
283  }
284 
285  m_sessionToHostNum[ s ] = ++num;
286 }
287 
288 }
#define THREAD_PROC
void thread_join(thread_id thread)
Definition: Utility.cpp:437
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1496
void socket_init()
Definition: Utility.cpp:81
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
Encapsulates a socket file descriptor (multi-threaded).
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection *> ThreadPair
void socket_term()
Definition: Utility.cpp:96
const char RECONNECT_INTERVAL[]
void next()
Definition: Session.cpp:125
void thread_detach(thread_id thread)
Definition: Utility.cpp:447
int socket_createConnector()
Definition: Utility.cpp:143
const char SOCKET_NODELAY[]
void process_sleep(double s)
Definition: Utility.cpp:466
std::map< int, thread_id > SocketToThread
const char SOCKET_CONNECT_HOST[]
const char SOCKET_SEND_BUFFER_SIZE[]
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:416
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:149
void onStop()
Implemented to stop a running initiator.
const char SOCKET_CONNECT_PORT[]
static std::string convert(signed_int value)
bool onPoll(double timeout)
Implemented to connect and poll for events.
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
static THREAD_PROC socketThread(void *p)
const char SOCKET_RECEIVE_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:208
This interface must be implemented to log messages and events.
Definition: Log.h:81
Definition: Acceptor.cpp:34
const char SOCKET_CONNECT_SOURCE_PORT[]
virtual void onEvent(const std::string &)=0
Log * getLog()
Definition: Session.h:227
This interface must be implemented to define what your FIX application does.
Definition: Application.h:43
Application encountered serious error during runtime
Definition: Exceptions.h:94
Application is not configured correctly
Definition: Exceptions.h:87
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:95
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Container for setting dictionaries mapped to sessions.
void start()
Start initiator.
Definition: Initiator.cpp:190
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
This interface must be implemented to create a Log.
Definition: Log.h:42
const SessionID & getSessionID() const
Definition: Session.h:75
Log * getLog()
Definition: Initiator.h:90
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:41
void onStart()
Implemented to start connecting to targets.
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:275
pthread_t thread_id
Definition: Utility.h:190
Date and Time represented in UTC.
Definition: FieldTypes.h:582
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:36
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Definition: Dictionary.cpp:32
void doConnect(const SessionID &s, const Dictionary &d)
Implemented to connect a session to its target.
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30
void socket_close(int s)
Definition: Utility.cpp:180
bool isSessionTime(const UtcTimeStamp &time)
Definition: Session.h:108
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:71
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:45
Threaded Socket implementation of Initiator.
bool isStopped()
Definition: Initiator.h:83
Base for classes which act as an initiator for establishing connections.
Definition: Initiator.h:51
const char SOCKET_CONNECT_SOURCE_HOST[]
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163

Generated on Wed Aug 28 2019 14:13:46 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001