ThreadedSocketConnection.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
27 #include "ThreadedSocketAcceptor.h"
29 #include "Session.h"
30 #include "Utility.h"
31 
32 namespace FIX
33 {
35 ( int s, Sessions sessions, Log* pLog )
36 : m_socket( s ), m_pLog( pLog ),
37  m_sessions( sessions ), m_pSession( 0 ),
38  m_disconnect( false )
39 {
40  FD_ZERO( &m_fds );
41  FD_SET( m_socket, &m_fds );
42 }
43 
45 ( const SessionID& sessionID, int s,
46  const std::string& address, short port,
47  Log* pLog,
48  const std::string& sourceAddress, short sourcePort )
49  : m_socket( s ), m_address( address ), m_port( port ),
50  m_sourceAddress( sourceAddress ), m_sourcePort( sourcePort ),
51  m_pLog( pLog ),
52  m_pSession( Session::lookupSession( sessionID ) ),
53  m_disconnect( false )
54 {
55  FD_ZERO( &m_fds );
56  FD_SET( m_socket, &m_fds );
57  if ( m_pSession ) m_pSession->setResponder( this );
58 }
59 
61 {
62  if ( m_pSession )
63  {
64  m_pSession->setResponder( 0 );
65  Session::unregisterSession( m_pSession->getSessionID() );
66  }
67 }
68 
69 bool ThreadedSocketConnection::send( const std::string& msg )
70 {
71  int totalSent = 0;
72  while(totalSent < (int)msg.length())
73  {
74  ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
75  if(sent < 0) return false;
76  totalSent += sent;
77  }
78 
79  return true;
80 }
81 
83 {
84  // do the bind in the thread as name resolution may block
85  if ( !m_sourceAddress.empty() || m_sourcePort )
86  socket_bind( m_socket, m_sourceAddress.c_str(), m_sourcePort );
87 
88  return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
89 }
90 
92 {
93  m_disconnect = true;
94  socket_close( m_socket );
95 }
96 
98 {
99  struct timeval timeout = { 1, 0 };
100  fd_set readset = m_fds;
101 
102  try
103  {
104  // Wait for input (1 second timeout)
105  int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
106 
107  if( result > 0 ) // Something to read
108  {
109  // We can read without blocking
110  ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
111  if ( size <= 0 ) { throw SocketRecvFailed( size ); }
112  m_parser.addToStream( m_buffer, size );
113  }
114  else if( result == 0 && m_pSession ) // Timeout
115  {
116  m_pSession->next();
117  }
118  else if( result < 0 ) // Error
119  {
120  throw SocketRecvFailed( result );
121  }
122 
123  processStream();
124  return true;
125  }
126  catch ( SocketRecvFailed& e )
127  {
128  if( m_disconnect )
129  return false;
130 
131  if( m_pSession )
132  {
133  m_pSession->getLog()->onEvent( e.what() );
134  m_pSession->disconnect();
135  }
136  else
137  {
138  disconnect();
139  }
140 
141  return false;
142  }
143 }
144 
145 bool ThreadedSocketConnection::readMessage( std::string& msg )
146 throw( SocketRecvFailed )
147 {
148  try
149  {
150  return m_parser.readFixMessage( msg );
151  }
152  catch ( MessageParseError& ) {}
153  return true;
154 }
155 
157 {
158  std::string msg;
159  while( readMessage(msg) )
160  {
161  if ( !m_pSession )
162  {
163  if ( !setSession( msg ) )
164  { disconnect(); continue; }
165  }
166  try
167  {
168  m_pSession->next( msg, UtcTimeStamp() );
169  }
170  catch( InvalidMessage& )
171  {
172  if( !m_pSession->isLoggedOn() )
173  {
174  disconnect();
175  return;
176  }
177  }
178  }
179 }
180 
181 bool ThreadedSocketConnection::setSession( const std::string& msg )
182 {
183  m_pSession = Session::lookupSession( msg, true );
184  if ( !m_pSession )
185  {
186  if( m_pLog )
187  {
188  m_pLog->onEvent( "Session not found for incoming message: " + msg );
189  m_pLog->onIncoming( msg );
190  }
191  return false;
192  }
193 
194  SessionID sessionID = m_pSession->getSessionID();
195  m_pSession = 0;
196 
197  // see if the session frees up within 5 seconds
198  for( int i = 1; i <= 5; i++ )
199  {
200  if( !Session::isSessionRegistered( sessionID ) )
201  m_pSession = Session::registerSession( sessionID );
202  if( m_pSession ) break;
203  process_sleep( 1 );
204  }
205 
206  if ( !m_pSession )
207  return false;
208  if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
209  return false;
210 
211  m_pSession->setResponder( this );
212  return true;
213 }
214 
215 } // namespace FIX
FIX::ThreadedSocketConnection::readMessage
bool readMessage(std::string &msg)
Definition: ThreadedSocketConnection.cpp:162
FIX::ThreadedSocketConnection::setSession
bool setSession(const std::string &msg)
Definition: ThreadedSocketConnection.cpp:198
FIX::Session::lookupSession
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1513
FIX::Session::unregisterSession
static void unregisterSession(const SessionID &)
Definition: Session.cpp:1564
FIX::ThreadedSocketConnection::send
bool send(const std::string &)
Definition: ThreadedSocketConnection.cpp:86
FIX::socket_connect
int socket_connect(int socket, const char *address, int port)
Definition: Utility.cpp:165
FIX::Session::registerSession
static Session * registerSession(const SessionID &)
Definition: Session.cpp:1554
FIX::ThreadedSocketConnection::processStream
void processStream()
Definition: ThreadedSocketConnection.cpp:173
FIX::ThreadedSocketConnection::disconnect
void disconnect()
Definition: ThreadedSocketConnection.cpp:108
FIX::socket_recv
ssize_t socket_recv(int s, char *buf, size_t length)
Definition: Utility.cpp:187
FIX::socket_bind
int socket_bind(int socket, const char *hostname, int port)
Definition: Utility.cpp:120
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
FIX::Log::onEvent
virtual void onEvent(const std::string &)=0
FIX::Acceptor::m_pLog
Log * m_pLog
Definition: Acceptor.h:138
FIX::Session::isSessionRegistered
static bool isSessionRegistered(const SessionID &)
Definition: Session.cpp:1548
FIX::socket_send
ssize_t socket_send(int s, const char *msg, size_t length)
Definition: Utility.cpp:192
FIX::ThreadedSocketConnection::read
bool read()
Definition: ThreadedSocketConnection.cpp:114
FIX::process_sleep
void process_sleep(double s)
Definition: Utility.cpp:483
ThreadedSocketAcceptor.h
FIX::ThreadedSocketConnection::connect
bool connect()
Definition: ThreadedSocketConnection.cpp:99
FIX
Definition: Acceptor.cpp:34
ThreadedSocketInitiator.h
ThreadedSocketConnection.h
FIX::ThreadedSocketConnection::ThreadedSocketConnection
ThreadedSocketConnection(int s, Sessions sessions, Log *pLog)
Definition: ThreadedSocketConnection.cpp:52
Session.h
FIX::socket_close
void socket_close(int s)
Definition: Utility.cpp:197
FIX::SocketRecvFailed
Socket recv operation failed.
Definition: Exceptions.h:295
FIX::ThreadedSocketConnection::~ThreadedSocketConnection
virtual ~ThreadedSocketConnection()
Definition: ThreadedSocketConnection.cpp:77
FIX::Log::onIncoming
virtual void onIncoming(const std::string &)=0
Utility.h
FIX::Acceptor::m_sessions
Sessions m_sessions
Definition: Acceptor.h:130
FIX::Session
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:62

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001