ThreadedSocketAcceptor.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "ThreadedSocketAcceptor.h"
27 #include "Settings.h"
28 #include "Utility.h"
29 
30 namespace FIX
31 {
33  Application& application,
34  MessageStoreFactory& factory,
35  const SessionSettings& settings ) throw( ConfigError )
36 : Acceptor( application, factory, settings )
37 { socket_init(); }
38 
40  Application& application,
41  MessageStoreFactory& factory,
42  const SessionSettings& settings,
43  LogFactory& logFactory ) throw( ConfigError )
44 : Acceptor( application, factory, settings, logFactory )
45 {
46  socket_init();
47 }
48 
50 {
51  socket_term();
52 }
53 
55 throw ( ConfigError )
56 {
57  std::set<SessionID> sessions = s.getSessions();
58  std::set<SessionID>::iterator i;
59  for( i = sessions.begin(); i != sessions.end(); ++i )
60  {
61  const Dictionary& settings = s.get( *i );
62  settings.getInt( SOCKET_ACCEPT_PORT );
63  if( settings.has(SOCKET_REUSE_ADDRESS) )
64  settings.getBool( SOCKET_REUSE_ADDRESS );
65  if( settings.has(SOCKET_NODELAY) )
66  settings.getBool( SOCKET_NODELAY );
67  }
68 }
69 
71 throw ( RuntimeError )
72 {
73  short port = 0;
74  std::set<int> ports;
75 
76  std::set<SessionID> sessions = s.getSessions();
77  std::set<SessionID>::iterator i = sessions.begin();
78  for( ; i != sessions.end(); ++i )
79  {
80  const Dictionary& settings = s.get( *i );
81  port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82 
83  m_portToSessions[port].insert( *i );
84 
85  if( ports.find(port) != ports.end() )
86  continue;
87  ports.insert( port );
88 
89  const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
90  settings.getBool( SOCKET_REUSE_ADDRESS ) : true;
91 
92  const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93  settings.getBool( SOCKET_NODELAY ) : false;
94 
95  const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96  settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97 
98  const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99  settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100 
101  int socket = socket_createAcceptor( port, reuseAddress );
102  if( socket < 0 )
103  {
104  SocketException e;
105  socket_close( socket );
106  throw RuntimeError( "Unable to create, bind, or listen to port "
107  + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108  }
109  if( noDelay )
110  socket_setsockopt( socket, TCP_NODELAY );
111  if( sendBufSize )
112  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113  if( rcvBufSize )
114  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115 
116  m_socketToPort[socket] = port;
117  m_sockets.insert( socket );
118  }
119 }
120 
122 {
123  Sockets::iterator i;
124  for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125  {
126  Locker l( m_mutex );
127  int port = m_socketToPort[*i];
128  AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129  thread_id thread;
130  thread_spawn( &socketAcceptorThread, info, thread );
131  addThread( *i, thread );
132  }
133 }
134 
135 bool ThreadedSocketAcceptor::onPoll( double timeout )
136 {
137  return false;
138 }
139 
141 {
142  SocketToThread threads;
143  SocketToThread::iterator i;
144 
145  {
146  Locker l(m_mutex);
147 
148  time_t start = 0;
149  time_t now = 0;
150 
151  ::time( &start );
152  while ( isLoggedOn() )
153  {
154  if( ::time(&now) -5 >= start )
155  break;
156  }
157 
158  threads = m_threads;
159  m_threads.clear();
160  }
161 
162  for ( i = threads.begin(); i != threads.end(); ++i )
163  socket_close( i->first );
164  for ( i = threads.begin(); i != threads.end(); ++i )
165  thread_join( i->second );
166 }
167 
169 {
170  Locker l(m_mutex);
171 
172  m_threads[ s ] = t;
173 }
174 
176 {
177  Locker l(m_mutex);
178  SocketToThread::iterator i = m_threads.find( s );
179  if ( i != m_threads.end() )
180  {
181  thread_detach( i->second );
182  m_threads.erase( i );
183  }
184 }
185 
187 {
188  AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189 
190  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
191  int s = info->m_socket;
192  int port = info->m_port;
193  delete info;
194 
195  int noDelay = 0;
196  int sendBufSize = 0;
197  int rcvBufSize = 0;
198  socket_getsockopt( s, TCP_NODELAY, noDelay );
199  socket_getsockopt( s, SO_SNDBUF, sendBufSize );
200  socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
201 
202  int socket = 0;
203  while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
204  {
205  if( noDelay )
206  socket_setsockopt( socket, TCP_NODELAY );
207  if( sendBufSize )
208  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209  if( rcvBufSize )
210  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211 
212  Sessions sessions = pAcceptor->m_portToSessions[port];
213 
214  ThreadedSocketConnection * pConnection =
216  ( socket, sessions, pAcceptor->getLog() );
217 
218  ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219 
220  {
221  Locker l( pAcceptor->m_mutex );
222 
223  std::stringstream stream;
224  stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225 
226  if( pAcceptor->getLog() )
227  pAcceptor->getLog()->onEvent( stream.str() );
228 
229  thread_id thread;
230  if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231  {
232  delete info;
233  delete pConnection;
234  }
235  else
236  pAcceptor->addThread( socket, thread );
237  }
238  }
239 
240  if( !pAcceptor->isStopped() )
241  pAcceptor->removeThread( s );
242 
243  return 0;
244 }
245 
247 {
248  ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
249 
250  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
251  ThreadedSocketConnection* pConnection = info->m_pConnection;
252  delete info;
253 
254  int socket = pConnection->getSocket();
255 
256  while ( pConnection->read() ) {}
257  delete pConnection;
258  if( !pAcceptor->isStopped() )
259  pAcceptor->removeThread( socket );
260  return 0;
261 }
262 }
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Acceptor.cpp:230
#define THREAD_PROC
void thread_join(thread_id thread)
Definition: Utility.cpp:437
int socket_getsockopt(int s, int opt, int &optval)
Definition: Utility.cpp:233
int socket_accept(int s)
Definition: Utility.cpp:164
void onInitialize(const SessionSettings &)
Implemented to initialize acceptor.
void socket_init()
Definition: Utility.cpp:81
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Encapsulates a socket file descriptor (multi-threaded).
void socket_term()
Definition: Utility.cpp:96
void thread_detach(thread_id thread)
Definition: Utility.cpp:447
const char SOCKET_NODELAY[]
const char SOCKET_ACCEPT_PORT[]
void onStop()
Implemented to stop a running acceptor.
const char SOCKET_SEND_BUFFER_SIZE[]
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:416
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:149
static THREAD_PROC socketAcceptorThread(void *p)
static std::string convert(signed_int value)
const char SOCKET_RECEIVE_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:208
Definition: Acceptor.cpp:34
std::map< int, thread_id > SocketToThread
This interface must be implemented to define what your FIX application does.
Definition: Application.h:43
void start()
Start acceptor.
Definition: Acceptor.cpp:158
Application encountered serious error during runtime
Definition: Exceptions.h:94
Application is not configured correctly
Definition: Exceptions.h:87
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:95
Container for setting dictionaries mapped to sessions.
bool onPoll(double timeout)
Implemented to connect and poll for events.
Base for classes which act as an acceptor for incoming connections.
Definition: Acceptor.h:49
std::map< SessionID, Session *> Sessions
Definition: Acceptor.h:110
static THREAD_PROC socketConnectionThread(void *p)
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
This interface must be implemented to create a Log.
Definition: Log.h:42
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:41
void addThread(int s, thread_id t)
Threaded Socket implementation of Acceptor.
Socket Error.
Definition: Exceptions.h:245
int socket_createAcceptor(int port, bool reuse)
Definition: Utility.cpp:120
pthread_t thread_id
Definition: Utility.h:190
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:36
const char SOCKET_REUSE_ADDRESS[]
void socket_close(int s)
Definition: Utility.cpp:180
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:71
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:45
const char * socket_peername(int socket)
Definition: Utility.cpp:353
void onStart()
Implemented to start listening for connections.

Generated on Wed Aug 28 2019 14:13:46 for QuickFIX by doxygen 1.8.13 written by Dimitri van Heesch, © 1997-2001