ThreadedSocketInitiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
27 #include "Session.h"
28 #include "Settings.h"
29 
30 namespace FIX
31 {
33  Application& application,
34  MessageStoreFactory& factory,
35  const SessionSettings& settings ) throw( ConfigError )
36 : Initiator( application, factory, settings ),
37  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39 {
40  socket_init();
41 }
42 
44  Application& application,
45  MessageStoreFactory& factory,
46  const SessionSettings& settings,
47  LogFactory& logFactory ) throw( ConfigError )
48 : Initiator( application, factory, settings, logFactory ),
49  m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50  m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51 {
52  socket_init();
53 }
54 
56 {
57  socket_term();
58 }
59 
61 throw ( ConfigError )
62 {
63  const Dictionary& dict = s.get();
64 
65  if( dict.has( RECONNECT_INTERVAL ) )
66  m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
67  if( dict.has( SOCKET_NODELAY ) )
68  m_noDelay = dict.getBool( SOCKET_NODELAY );
69  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
70  m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
71  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
72  m_rcvBufSize = dict.getInt( SOCKET_RECEIVE_BUFFER_SIZE );
73 }
74 
76 throw ( RuntimeError )
77 {
78 }
79 
81 {
82  while ( !isStopped() )
83  {
84  time_t now;
85  ::time( &now );
86 
87  if ( (now - m_lastConnect) >= m_reconnectInterval )
88  {
89  Locker l( m_mutex );
90  connect();
91  m_lastConnect = now;
92  }
93 
94  process_sleep( 1 );
95  }
96 }
97 
98 bool ThreadedSocketInitiator::onPoll( double timeout )
99 {
100  return false;
101 }
102 
104 {
105  SocketToThread threads;
106  SocketToThread::iterator i;
107 
108  {
109  Locker l(m_mutex);
110 
111  time_t start = 0;
112  time_t now = 0;
113 
114  ::time( &start );
115  while ( isLoggedOn() )
116  {
117  if( ::time(&now) -5 >= start )
118  break;
119  }
120 
121  threads = m_threads;
122  m_threads.clear();
123  }
124 
125  for ( i = threads.begin(); i != threads.end(); ++i )
126  socket_close( i->first );
127 
128  for ( i = threads.begin(); i != threads.end(); ++i )
129  thread_join( i->second );
130  threads.clear();
131 }
132 
133 void ThreadedSocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
134 {
135  try
136  {
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  std::string address;
143  short port = 0;
144  std::string sourceAddress;
145  short sourcePort = 0;
146  getHost( s, d, address, port, sourceAddress, sourcePort );
147 
148  int socket = socket_createConnector();
149  if( m_noDelay )
150  socket_setsockopt( socket, TCP_NODELAY );
151  if( m_sendBufSize )
152  socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
153  if( m_rcvBufSize )
154  socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
155 
156  setPending( s );
157  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
158 
159  ThreadedSocketConnection* pConnection =
160  new ThreadedSocketConnection( s, socket, address, port, getLog(), sourceAddress, sourcePort );
161 
162  ThreadPair* pair = new ThreadPair( this, pConnection );
163 
164  {
165  Locker l( m_mutex );
166  thread_id thread;
167  if ( thread_spawn( &socketThread, pair, thread ) )
168  {
169  addThread( socket, thread );
170  }
171  else
172  {
173  delete pair;
174  pConnection->disconnect();
175  delete pConnection;
176  setDisconnected( s );
177  }
178  }
179  }
180  catch ( std::exception& ) {}
181 }
182 
184 {
185  Locker l(m_mutex);
186 
187  m_threads[ s ] = t;
188 }
189 
191 {
192  Locker l(m_mutex);
193  SocketToThread::iterator i = m_threads.find( s );
194 
195  if ( i != m_threads.end() )
196  {
197  thread_detach( i->second );
198  m_threads.erase( i );
199  }
200 }
201 
203 {
204  ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
205 
206  ThreadedSocketInitiator* pInitiator = pair->first;
207  ThreadedSocketConnection* pConnection = pair->second;
208  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
209  FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
210  int socket = pConnection->getSocket();
211  delete pair;
212 
213  pInitiator->lock();
214 
215  if( !pConnection->connect() )
216  {
217  pInitiator->getLog()->onEvent( "Connection failed" );
218  pConnection->disconnect();
219  delete pConnection;
220  pInitiator->removeThread( socket );
221  pInitiator->setDisconnected( sessionID );
222  return 0;
223  }
224 
225  pInitiator->setConnected( sessionID );
226  pInitiator->getLog()->onEvent( "Connection succeeded" );
227 
228  pSession->next();
229 
230  while ( pConnection->read() ) {}
231 
232  delete pConnection;
233  if( !pInitiator->isStopped() )
234  pInitiator->removeThread( socket );
235 
236  pInitiator->setDisconnected( sessionID );
237  return 0;
238 }
239 
240 void ThreadedSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
241  std::string& address, short& port,
242  std::string& sourceAddress, short& sourcePort )
243 {
244  int num = 0;
245  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
246  if ( i != m_sessionToHostNum.end() ) num = i->second;
247 
248  std::stringstream hostStream;
249  hostStream << SOCKET_CONNECT_HOST << num;
250  std::string hostString = hostStream.str();
251 
252  std::stringstream portStream;
253  portStream << SOCKET_CONNECT_PORT << num;
254  std::string portString = portStream.str();
255 
256  if( d.has(hostString) && d.has(portString) )
257  {
258  address = d.getString( hostString );
259  port = ( short ) d.getInt( portString );
260 
261  std::stringstream sourceHostStream;
262  sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
263  hostString = sourceHostStream.str();
264  if( d.has(hostString) )
265  sourceAddress = d.getString( hostString );
266 
267  std::stringstream sourcePortStream;
268  sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
269  portString = sourcePortStream.str();
270  if( d.has(portString) )
271  sourcePort = ( short ) d.getInt( portString );
272  }
273  else
274  {
275  num = 0;
276  address = d.getString( SOCKET_CONNECT_HOST );
277  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
278 
279  if( d.has(SOCKET_CONNECT_SOURCE_HOST) )
280  sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
281  if( d.has(SOCKET_CONNECT_SOURCE_PORT) )
282  sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
283  }
284 
285  m_sessionToHostNum[ s ] = ++num;
286 }
287 
288 }
FIX::thread_id
pthread_t thread_id
Definition: Utility.h:190
FIX::ThreadedSocketInitiator::getHost
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
Definition: ThreadedSocketInitiator.cpp:257
FIX::Initiator::start
void start()
Start initiator.
Definition: Initiator.cpp:207
FIX::SOCKET_RECEIVE_BUFFER_SIZE
const char SOCKET_RECEIVE_BUFFER_SIZE[]
Definition: SessionSettings.h:87
FIX::ThreadedSocketInitiator::ThreadedSocketInitiator
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: ThreadedSocketInitiator.cpp:49
FIX::ThreadedSocketInitiator::m_mutex
Mutex m_mutex
Definition: ThreadedSocketInitiator.h:96
FIX::Session::lookupSession
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1513
FIX::SOCKET_NODELAY
const char SOCKET_NODELAY[]
Definition: SessionSettings.h:85
FIX::ThreadedSocketInitiator::onInitialize
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
Definition: ThreadedSocketInitiator.cpp:92
FIX::socket_setsockopt
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:225
FIX::SOCKET_CONNECT_SOURCE_PORT
const char SOCKET_CONNECT_SOURCE_PORT[]
Definition: SessionSettings.h:84
FIX::Dictionary::getInt
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:62
FIX::socket_init
void socket_init()
Definition: Utility.cpp:98
FIX::ThreadedSocketInitiator::onStart
void onStart()
Implemented to start connecting to targets.
Definition: ThreadedSocketInitiator.cpp:97
FIX::Initiator::connect
void connect()
Definition: Initiator.cpp:148
FIX::Session::getSessionID
const SessionID & getSessionID() const
Definition: Session.h:109
FIX::ThreadedSocketInitiator::lock
void lock()
Definition: ThreadedSocketInitiator.h:83
FIX::ThreadedSocketConnection::getSocket
int getSocket() const
Definition: ThreadedSocketConnection.h:74
FIX::Initiator::getLog
Log * getLog()
Definition: Initiator.h:107
FIX::SOCKET_CONNECT_SOURCE_HOST
const char SOCKET_CONNECT_SOURCE_HOST[]
Definition: SessionSettings.h:83
FIX::SOCKET_SEND_BUFFER_SIZE
const char SOCKET_SEND_BUFFER_SIZE[]
Definition: SessionSettings.h:86
FIX::thread_detach
void thread_detach(thread_id thread)
Definition: Utility.cpp:464
FIX::ThreadedSocketInitiator::m_sendBufSize
int m_sendBufSize
Definition: ThreadedSocketInitiator.h:93
FIX::ConfigError
Application is not configured correctly
Definition: Exceptions.h:104
FIX::socket_term
void socket_term()
Definition: Utility.cpp:113
FIX::RuntimeError
Application encountered serious error during runtime
Definition: Exceptions.h:111
FIX::SessionID
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:47
FIX::Initiator::isLoggedOn
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:292
FIX::ThreadedSocketConnection::disconnect
void disconnect()
Definition: ThreadedSocketConnection.cpp:108
FIX::Session::next
void next()
Definition: Session.cpp:142
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
FIX::ThreadedSocketConnection::getSession
Session * getSession() const
Definition: ThreadedSocketConnection.h:73
FIX::SessionSettings
Container for setting dictionaries mapped to sessions.
Definition: SessionSettings.h:237
THREAD_PROC
#define THREAD_PROC
Definition: Utility.h:184
FIX::IntConvertor::convert
static std::string convert(signed_int value)
Definition: FieldConvertors.h:170
FIX::RECONNECT_INTERVAL
const char RECONNECT_INTERVAL[]
Definition: SessionSettings.h:88
FIX::Log::onEvent
virtual void onEvent(const std::string &)=0
FIX::ThreadedSocketInitiator::m_reconnectInterval
int m_reconnectInterval
Definition: ThreadedSocketInitiator.h:91
FIX::Dictionary::has
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:166
FIX::ThreadedSocketInitiator::m_sessionToHostNum
SessionToHostNum m_sessionToHostNum
Definition: ThreadedSocketInitiator.h:89
FIX::socket_createConnector
int socket_createConnector()
Definition: Utility.cpp:160
FIX::ThreadedSocketInitiator::SocketToThread
std::map< int, thread_id > SocketToThread
Definition: ThreadedSocketInitiator.h:68
FIX::Initiator::setDisconnected
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:180
FIX::ThreadedSocketConnection
Encapsulates a socket file descriptor (multi-threaded).
Definition: ThreadedSocketConnection.h:61
Settings.h
FIX::ThreadedSocketConnection::read
bool read()
Definition: ThreadedSocketConnection.cpp:114
FIX::Dictionary::getBool
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:88
FIX::process_sleep
void process_sleep(double s)
Definition: Utility.cpp:483
FIX::Initiator::setPending
void setPending(const SessionID &)
Definition: Initiator.cpp:162
FIX::ThreadedSocketConnection::connect
bool connect()
Definition: ThreadedSocketConnection.cpp:99
FIX::ThreadedSocketInitiator
Threaded Socket implementation of Initiator.
Definition: ThreadedSocketInitiator.h:56
FIX::SOCKET_CONNECT_PORT
const char SOCKET_CONNECT_PORT[]
Definition: SessionSettings.h:82
FIX::ThreadedSocketInitiator::doConnect
void doConnect(const SessionID &s, const Dictionary &d)
Implemented to connect a session to its target.
Definition: ThreadedSocketInitiator.cpp:150
FIX
Definition: Acceptor.cpp:34
FIX::Initiator::setConnected
void setConnected(const SessionID &)
Definition: Initiator.cpp:171
ThreadedSocketInitiator.h
FIX::ThreadedSocketInitiator::removeThread
void removeThread(int s)
Definition: ThreadedSocketInitiator.cpp:207
FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator
virtual ~ThreadedSocketInitiator()
Definition: ThreadedSocketInitiator.cpp:72
Session.h
FIX::socket_close
void socket_close(int s)
Definition: Utility.cpp:197
FIX::ThreadedSocketInitiator::ThreadPair
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection * > ThreadPair
Definition: ThreadedSocketInitiator.h:70
FIX::ThreadedSocketInitiator::m_lastConnect
time_t m_lastConnect
Definition: ThreadedSocketInitiator.h:90
FIX::ThreadedSocketInitiator::onStop
void onStop()
Implemented to stop a running initiator.
Definition: ThreadedSocketInitiator.cpp:120
FIX::ThreadedSocketInitiator::addThread
void addThread(int s, thread_id t)
Definition: ThreadedSocketInitiator.cpp:200
FIX::Locker
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:112
FIX::thread_join
void thread_join(thread_id thread)
Definition: Utility.cpp:454
FIX::ThreadedSocketInitiator::onConfigure
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Definition: ThreadedSocketInitiator.cpp:77
FIX::ThreadedSocketInitiator::m_noDelay
bool m_noDelay
Definition: ThreadedSocketInitiator.h:92
FIX::ThreadedSocketInitiator::onPoll
bool onPoll(double timeout)
Implemented to connect and poll for events.
Definition: ThreadedSocketInitiator.cpp:115
FIX::ThreadedSocketInitiator::socketThread
static THREAD_PROC socketThread(void *p)
Definition: ThreadedSocketInitiator.cpp:219
FIX::ThreadedSocketInitiator::m_threads
SocketToThread m_threads
Definition: ThreadedSocketInitiator.h:95
FIX::Initiator::isStopped
bool isStopped()
Definition: Initiator.h:100
FIX::Dictionary
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:53
FIX::thread_spawn
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:433
FIX::ThreadedSocketInitiator::m_rcvBufSize
int m_rcvBufSize
Definition: ThreadedSocketInitiator.h:94
FIX::Session
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:62
FIX::SOCKET_CONNECT_HOST
const char SOCKET_CONNECT_HOST[]
Definition: SessionSettings.h:81

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001