SocketConnection.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "SocketConnection.h"
27 #include "SocketAcceptor.h"
28 #include "SocketConnector.h"
29 #include "SocketInitiator.h"
30 #include "Session.h"
31 #include "Utility.h"
32 
33 namespace FIX
34 {
35 SocketConnection::SocketConnection( int s, Sessions sessions,
36  SocketMonitor* pMonitor )
37 : m_socket( s ), m_sendLength( 0 ),
38  m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor )
39 {
40  FD_ZERO( &m_fds );
41  FD_SET( m_socket, &m_fds );
42 }
43 
44 SocketConnection::SocketConnection( SocketInitiator& i,
45  const SessionID& sessionID, int s,
46  SocketMonitor* pMonitor )
47 : m_socket( s ), m_sendLength( 0 ),
48  m_pSession( i.getSession( sessionID, *this ) ),
49  m_pMonitor( pMonitor )
50 {
51  FD_ZERO( &m_fds );
52  FD_SET( m_socket, &m_fds );
53  m_sessions.insert( sessionID );
54 }
55 
57 {
58  if ( m_pSession )
60 }
61 
62 bool SocketConnection::send( const std::string& msg )
63 {
64  Locker l( m_mutex );
65 
66  m_sendQueue.push_back( msg );
67  processQueue();
68  signal();
69  return true;
70 }
71 
73 {
74  Locker l( m_mutex );
75 
76  if( !m_sendQueue.size() ) return true;
77 
78  struct timeval timeout = { 0, 0 };
79  fd_set writeset = m_fds;
80  if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 )
81  return false;
82 
83  const std::string& msg = m_sendQueue.front();
84 
85  ssize_t result = socket_send
86  ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength );
87 
88  if( result > 0 )
89  m_sendLength += result;
90 
91  if( m_sendLength == msg.length() )
92  {
93  m_sendLength = 0;
94  m_sendQueue.pop_front();
95  }
96 
97  return !m_sendQueue.size();
98 }
99 
101 {
102  if ( m_pMonitor )
104 }
105 
106 bool SocketConnection::read( SocketConnector& s )
107 {
108  if ( !m_pSession ) return false;
109 
110  try
111  {
112  readFromSocket();
113  readMessages( s.getMonitor() );
114  }
115  catch( SocketRecvFailed& e )
116  {
117  m_pSession->getLog()->onEvent( e.what() );
118  return false;
119  }
120  return true;
121 }
122 
124 {
125  std::string msg;
126  try
127  {
128  if ( !m_pSession )
129  {
130  struct timeval timeout = { 1, 0 };
131  fd_set readset = m_fds;
132 
133  while( !readMessage( msg ) )
134  {
135  int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
136  if( result > 0 )
137  readFromSocket();
138  else if( result == 0 )
139  return false;
140  else if( result < 0 )
141  return false;
142  }
143 
144  m_pSession = Session::lookupSession( msg, true );
145  if( !isValidSession() )
146  {
147  m_pSession = 0;
148  if( a.getLog() )
149  {
150  a.getLog()->onEvent( "Session not found for incoming message: " + msg );
151  a.getLog()->onIncoming( msg );
152  }
153  }
154  if( m_pSession )
155  m_pSession = a.getSession( msg, *this );
156  if( m_pSession )
157  m_pSession->next( msg, UtcTimeStamp() );
158  if( !m_pSession )
159  {
160  s.getMonitor().drop( m_socket );
161  return false;
162  }
163 
165  return true;
166  }
167  else
168  {
169  readFromSocket();
170  readMessages( s.getMonitor() );
171  return true;
172  }
173  }
174  catch ( SocketRecvFailed& e )
175  {
176  if( m_pSession )
177  m_pSession->getLog()->onEvent( e.what() );
178  s.getMonitor().drop( m_socket );
179  }
180  catch ( InvalidMessage& )
181  {
182  s.getMonitor().drop( m_socket );
183  }
184  return false;
185 }
186 
188 {
189  if( m_pSession == 0 )
190  return false;
191  SessionID sessionID = m_pSession->getSessionID();
192  if( Session::isSessionRegistered(sessionID) )
193  return false;
194  return !( m_sessions.find(sessionID) == m_sessions.end() );
195 }
196 
198 throw( SocketRecvFailed )
199 {
200  ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
201  if( size <= 0 ) throw SocketRecvFailed( size );
202  m_parser.addToStream( m_buffer, size );
203 }
204 
205 bool SocketConnection::readMessage( std::string& msg )
206 {
207  try
208  {
209  return m_parser.readFixMessage( msg );
210  }
211  catch ( MessageParseError& ) {}
212  return true;
213 }
214 
216 {
217  if( !m_pSession ) return;
218 
219  std::string msg;
220  while( readMessage( msg ) )
221  {
222  try
223  {
224  m_pSession->next( msg, UtcTimeStamp() );
225  }
226  catch ( InvalidMessage& )
227  {
228  if( !m_pSession->isLoggedOn() )
229  s.drop( m_socket );
230  }
231  }
232 }
233 
235 {
236  if ( m_pSession ) m_pSession->next();
237 }
238 } // namespace FIX
FIX::SocketConnection::m_socket
int m_socket
Definition: SocketConnection.h:106
FIX::SocketConnection::send
bool send(const std::string &)
Definition: SocketConnection.cpp:79
FIX::Acceptor::getSession
Session * getSession(const std::string &msg, Responder &)
Definition: Acceptor.cpp:121
FIX::Acceptor::getLog
Log * getLog()
Definition: Acceptor.h:76
FIX::Session::lookupSession
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1513
FIX::Session::unregisterSession
static void unregisterSession(const SessionID &)
Definition: Session.cpp:1564
FIX::SocketConnection::onTimeout
void onTimeout()
Definition: SocketConnection.cpp:251
FIX::SocketConnection::m_pSession
Session * m_pSession
Definition: SocketConnection.h:113
FIX::SocketConnection::m_fds
fd_set m_fds
Definition: SocketConnection.h:116
FIX::SocketMonitor
Monitors events on a collection of sockets.
Definition: SocketMonitor.h:64
FIX::Session::getSessionID
const SessionID & getSessionID() const
Definition: Session.h:109
FIX::SocketConnection::readMessage
bool readMessage(std::string &msg)
Definition: SocketConnection.cpp:222
FIX::SocketConnection::m_parser
Parser m_parser
Definition: SocketConnection.h:109
FIX::SocketConnection::readFromSocket
void readFromSocket()
Definition: SocketConnection.cpp:214
FIX::Session::registerSession
static Session * registerSession(const SessionID &)
Definition: Session.cpp:1554
FIX::SocketAcceptor
Socket implementation of Acceptor.
Definition: SocketAcceptor.h:53
FIX::Session::next
void next()
Definition: Session.cpp:142
FIX::SocketServer::getMonitor
SocketMonitor & getMonitor()
Definition: SocketServer.h:87
SocketConnection.h
FIX::socket_recv
ssize_t socket_recv(int s, char *buf, size_t length)
Definition: Utility.cpp:187
FIX::SocketConnection::m_sendLength
unsigned m_sendLength
Definition: SocketConnection.h:111
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
FIX::Log::onEvent
virtual void onEvent(const std::string &)=0
FIX::Session::getLog
Log * getLog()
Definition: Session.h:261
FIX::Session::isSessionRegistered
static bool isSessionRegistered(const SessionID &)
Definition: Session.cpp:1548
FIX::socket_send
ssize_t socket_send(int s, const char *msg, size_t length)
Definition: Utility.cpp:192
FIX::SocketConnection::isValidSession
bool isValidSession()
Definition: SocketConnection.cpp:204
FIX::SocketConnection::m_buffer
char m_buffer[BUFSIZ]
Definition: SocketConnection.h:107
FIX::Session::isLoggedOn
bool isLoggedOn()
Definition: Session.h:99
FIX::SocketConnection::disconnect
void disconnect()
Definition: SocketConnection.cpp:117
FIX::SocketConnection::m_sendQueue
Queue m_sendQueue
Definition: SocketConnection.h:110
FIX::SocketConnection::~SocketConnection
virtual ~SocketConnection()
Definition: SocketConnection.cpp:73
FIX
Definition: Acceptor.cpp:34
FIX::SocketConnection::processQueue
bool processQueue()
Definition: SocketConnection.cpp:89
FIX::SocketConnection::SocketConnection
SocketConnection(int s, Sessions sessions, SocketMonitor *pMonitor)
Definition: SocketConnection.cpp:52
FIX::SocketConnection::m_mutex
Mutex m_mutex
Definition: SocketConnection.h:115
FIX::MessageParseError
Unable to parse message.
Definition: Exceptions.h:90
FIX::SocketMonitor::drop
bool drop(int socket)
Definition: SocketMonitor.cpp:115
FIX::Parser::addToStream
void addToStream(const char *str, size_t len)
Definition: Parser.h:82
FIX::Parser::readFixMessage
bool readFixMessage(std::string &str)
Definition: Parser.cpp:76
Session.h
FIX::SocketConnection::signal
void signal()
Definition: SocketConnection.h:79
FIX::SocketConnection::read
bool read(SocketConnector &s)
Definition: SocketConnection.cpp:123
SocketInitiator.h
FIX::Locker
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:112
FIX::InvalidMessage
Not a recognizable message.
Definition: Exceptions.h:97
FIX::SocketConnection::m_pMonitor
SocketMonitor * m_pMonitor
Definition: SocketConnection.h:114
FIX::SocketConnection::m_sessions
Sessions m_sessions
Definition: SocketConnection.h:112
SocketConnector.h
SocketAcceptor.h
FIX::SocketServer
Listens for and accepts incoming socket connections on a port.
Definition: SocketServer.h:73
FIX::Log::onIncoming
virtual void onIncoming(const std::string &)=0
Utility.h
FIX::Acceptor::m_sessions
Sessions m_sessions
Definition: Acceptor.h:130
FIX::SocketConnection::readMessages
void readMessages(SocketMonitor &s)
Definition: SocketConnection.cpp:232

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001