ThreadedSocketAcceptor.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "ThreadedSocketAcceptor.h"
27 #include "Settings.h"
28 #include "Utility.h"
29 
30 namespace FIX
31 {
33  Application& application,
34  MessageStoreFactory& factory,
35  const SessionSettings& settings ) throw( ConfigError )
36 : Acceptor( application, factory, settings )
37 { socket_init(); }
38 
40  Application& application,
41  MessageStoreFactory& factory,
42  const SessionSettings& settings,
43  LogFactory& logFactory ) throw( ConfigError )
44 : Acceptor( application, factory, settings, logFactory )
45 {
46  socket_init();
47 }
48 
50 {
51  socket_term();
52 }
53 
55 throw ( ConfigError )
56 {
57  std::set<SessionID> sessions = s.getSessions();
58  std::set<SessionID>::iterator i;
59  for( i = sessions.begin(); i != sessions.end(); ++i )
60  {
61  const Dictionary& settings = s.get( *i );
62  settings.getInt( SOCKET_ACCEPT_PORT );
63  if( settings.has(SOCKET_REUSE_ADDRESS) )
64  settings.getBool( SOCKET_REUSE_ADDRESS );
65  if( settings.has(SOCKET_NODELAY) )
66  settings.getBool( SOCKET_NODELAY );
67  }
68 }
69 
70 void ThreadedSocketAcceptor::onInitialize( const SessionSettings& s )
71 throw ( RuntimeError )
72 {
73  short port = 0;
74  std::set<int> ports;
75 
76  std::set<SessionID> sessions = s.getSessions();
77  std::set<SessionID>::iterator i = sessions.begin();
78  for( ; i != sessions.end(); ++i )
79  {
80  const Dictionary& settings = s.get( *i );
81  port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82 
83  m_portToSessions[port].insert( *i );
84 
85  if( ports.find(port) != ports.end() )
86  continue;
87  ports.insert( port );
88 
89  const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
90  settings.getBool( SOCKET_REUSE_ADDRESS ) : true;
91 
92  const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93  settings.getBool( SOCKET_NODELAY ) : false;
94 
95  const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96  settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97 
98  const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99  settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100 
101  int socket = socket_createAcceptor( port, reuseAddress );
102  if( socket < 0 )
103  {
104  SocketException e;
105  socket_close( socket );
106  throw RuntimeError( "Unable to create, bind, or listen to port "
107  + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108  }
109  if( noDelay )
110  socket_setsockopt( socket, TCP_NODELAY );
111  if( sendBufSize )
112  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113  if( rcvBufSize )
114  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115 
116  m_socketToPort[socket] = port;
117  m_sockets.insert( socket );
118  }
119 }
120 
122 {
123  Sockets::iterator i;
124  for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125  {
126  Locker l( m_mutex );
127  int port = m_socketToPort[*i];
128  AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129  thread_id thread;
130  thread_spawn( &socketAcceptorThread, info, thread );
131  addThread( *i, thread );
132  }
133 }
134 
135 bool ThreadedSocketAcceptor::onPoll( double timeout )
136 {
137  return false;
138 }
139 
141 {
142  SocketToThread threads;
143  SocketToThread::iterator i;
144 
145  {
146  Locker l(m_mutex);
147 
148  time_t start = 0;
149  time_t now = 0;
150 
151  ::time( &start );
152  while ( isLoggedOn() )
153  {
154  if( ::time(&now) -5 >= start )
155  break;
156  }
157 
158  threads = m_threads;
159  m_threads.clear();
160  }
161 
162  for ( i = threads.begin(); i != threads.end(); ++i )
163  socket_close( i->first );
164  for ( i = threads.begin(); i != threads.end(); ++i )
165  thread_join( i->second );
166 }
167 
169 {
170  Locker l(m_mutex);
171 
172  m_threads[ s ] = t;
173 }
174 
176 {
177  Locker l(m_mutex);
178  SocketToThread::iterator i = m_threads.find( s );
179  if ( i != m_threads.end() )
180  {
181  thread_detach( i->second );
182  m_threads.erase( i );
183  }
184 }
185 
187 {
188  AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189 
190  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
191  int s = info->m_socket;
192  int port = info->m_port;
193  delete info;
194 
195  int noDelay = 0;
196  int sendBufSize = 0;
197  int rcvBufSize = 0;
198  socket_getsockopt( s, TCP_NODELAY, noDelay );
199  socket_getsockopt( s, SO_SNDBUF, sendBufSize );
200  socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
201 
202  int socket = 0;
203  while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
204  {
205  if( noDelay )
206  socket_setsockopt( socket, TCP_NODELAY );
207  if( sendBufSize )
208  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209  if( rcvBufSize )
210  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211 
212  Sessions sessions = pAcceptor->m_portToSessions[port];
213 
214  ThreadedSocketConnection * pConnection =
216  ( socket, sessions, pAcceptor->getLog() );
217 
218  ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219 
220  {
221  Locker l( pAcceptor->m_mutex );
222 
223  std::stringstream stream;
224  stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225 
226  if( pAcceptor->getLog() )
227  pAcceptor->getLog()->onEvent( stream.str() );
228 
229  thread_id thread;
230  if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231  {
232  delete info;
233  delete pConnection;
234  }
235  else
236  pAcceptor->addThread( socket, thread );
237  }
238  }
239 
240  if( !pAcceptor->isStopped() )
241  pAcceptor->removeThread( s );
242 
243  return 0;
244 }
245 
247 {
248  ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
249 
250  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
251  ThreadedSocketConnection* pConnection = info->m_pConnection;
252  delete info;
253 
254  int socket = pConnection->getSocket();
255 
256  while ( pConnection->read() ) {}
257  delete pConnection;
258  if( !pAcceptor->isStopped() )
259  pAcceptor->removeThread( socket );
260  return 0;
261 }
262 }
FIX::ThreadedSocketAcceptor::onStart
void onStart()
Implemented to start listening for connections.
Definition: ThreadedSocketAcceptor.cpp:138
FIX::thread_id
pthread_t thread_id
Definition: Utility.h:190
FIX::SOCKET_RECEIVE_BUFFER_SIZE
const char SOCKET_RECEIVE_BUFFER_SIZE[]
Definition: SessionSettings.h:87
FIX::Acceptor::getLog
Log * getLog()
Definition: Acceptor.h:76
FIX::ThreadedSocketAcceptor::m_sockets
Sockets m_sockets
Definition: ThreadedSocketAcceptor.h:123
FIX::ThreadedSocketAcceptor::AcceptorThreadInfo
Definition: ThreadedSocketAcceptor.h:83
FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_port
int m_port
Definition: ThreadedSocketAcceptor.h:90
FIX::ThreadedSocketAcceptor::onStop
void onStop()
Implemented to stop a running acceptor.
Definition: ThreadedSocketAcceptor.cpp:157
FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_socket
int m_socket
Definition: ThreadedSocketAcceptor.h:89
FIX::SOCKET_NODELAY
const char SOCKET_NODELAY[]
Definition: SessionSettings.h:85
FIX::SocketException
Socket Error.
Definition: Exceptions.h:262
FIX::ThreadedSocketAcceptor::removeThread
void removeThread(int s)
Definition: ThreadedSocketAcceptor.cpp:192
FIX::socket_setsockopt
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:225
FIX::ThreadedSocketAcceptor::m_socketToPort
SocketToPort m_socketToPort
Definition: ThreadedSocketAcceptor.h:125
FIX::Dictionary::getInt
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:62
FIX::socket_init
void socket_init()
Definition: Utility.cpp:98
FIX::ThreadedSocketAcceptor::onConfigure
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Definition: ThreadedSocketAcceptor.cpp:71
FIX::ThreadedSocketAcceptor::m_threads
SocketToThread m_threads
Definition: ThreadedSocketAcceptor.h:126
FIX::Acceptor::start
void start()
Start acceptor.
Definition: Acceptor.cpp:175
FIX::SOCKET_SEND_BUFFER_SIZE
const char SOCKET_SEND_BUFFER_SIZE[]
Definition: SessionSettings.h:86
FIX::ThreadedSocketAcceptor::SocketToThread
std::map< int, thread_id > SocketToThread
Definition: ThreadedSocketAcceptor.h:109
FIX::thread_detach
void thread_detach(thread_id thread)
Definition: Utility.cpp:464
FIX::socket_getsockopt
int socket_getsockopt(int s, int opt, int &optval)
Definition: Utility.cpp:250
FIX::ConfigError
Application is not configured correctly
Definition: Exceptions.h:104
FIX::socket_term
void socket_term()
Definition: Utility.cpp:113
FIX::RuntimeError
Application encountered serious error during runtime
Definition: Exceptions.h:111
FIX::SOCKET_ACCEPT_PORT
const char SOCKET_ACCEPT_PORT[]
Definition: SessionSettings.h:79
FIX::ThreadedSocketAcceptor
Threaded Socket implementation of Acceptor.
Definition: ThreadedSocketAcceptor.h:53
FIX::SessionSettings
Container for setting dictionaries mapped to sessions.
Definition: SessionSettings.h:237
THREAD_PROC
#define THREAD_PROC
Definition: Utility.h:184
FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor
virtual ~ThreadedSocketAcceptor()
Definition: ThreadedSocketAcceptor.cpp:66
FIX::IntConvertor::convert
static std::string convert(signed_int value)
Definition: FieldConvertors.h:170
FIX::ThreadedSocketAcceptor::socketAcceptorThread
static THREAD_PROC socketAcceptorThread(void *p)
Definition: ThreadedSocketAcceptor.cpp:203
FIX::Acceptor::isLoggedOn
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Acceptor.cpp:247
FIX::Acceptor::Sessions
std::map< SessionID, Session * > Sessions
Definition: Acceptor.h:127
FIX::Log::onEvent
virtual void onEvent(const std::string &)=0
FIX::socket_accept
int socket_accept(int s)
Definition: Utility.cpp:181
FIX::Dictionary::has
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:166
FIX::ThreadedSocketConnection
Encapsulates a socket file descriptor (multi-threaded).
Definition: ThreadedSocketConnection.h:61
FIX::ThreadedSocketAcceptor::socketConnectionThread
static THREAD_PROC socketConnectionThread(void *p)
Definition: ThreadedSocketAcceptor.cpp:263
FIX::ThreadedSocketAcceptor::addThread
void addThread(int s, thread_id t)
Definition: ThreadedSocketAcceptor.cpp:185
Settings.h
FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_pAcceptor
ThreadedSocketAcceptor * m_pAcceptor
Definition: ThreadedSocketAcceptor.h:88
FIX::ThreadedSocketAcceptor::m_portToSessions
PortToSessions m_portToSessions
Definition: ThreadedSocketAcceptor.h:124
FIX::Dictionary::getBool
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:88
FIX::ThreadedSocketAcceptor::onPoll
bool onPoll(double timeout)
Implemented to connect and poll for events.
Definition: ThreadedSocketAcceptor.cpp:152
ThreadedSocketAcceptor.h
FIX
Definition: Acceptor.cpp:34
FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: ThreadedSocketAcceptor.cpp:49
FIX::socket_createAcceptor
int socket_createAcceptor(int port, bool reuse)
Definition: Utility.cpp:137
FIX::ThreadedSocketAcceptor::m_mutex
Mutex m_mutex
Definition: ThreadedSocketAcceptor.h:127
FIX::socket_close
void socket_close(int s)
Definition: Utility.cpp:197
FIX::Locker
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:112
FIX::thread_join
void thread_join(thread_id thread)
Definition: Utility.cpp:454
FIX::ThreadedSocketAcceptor::ConnectionThreadInfo
Definition: ThreadedSocketAcceptor.h:93
Utility.h
FIX::Acceptor::isStopped
bool isStopped()
Definition: Acceptor.h:104
FIX::SOCKET_REUSE_ADDRESS
const char SOCKET_REUSE_ADDRESS[]
Definition: SessionSettings.h:80
FIX::Dictionary
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:53
FIX::ThreadedSocketAcceptor::onInitialize
void onInitialize(const SessionSettings &)
Implemented to initialize acceptor.
Definition: ThreadedSocketAcceptor.cpp:87
FIX::thread_spawn
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:433
FIX::socket_peername
const char * socket_peername(int socket)
Definition: Utility.cpp:370

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001