Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketConnection:
Collaboration graph
[legend]

Public Types

typedef std::set< SessionIDSessions
 

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Log *pLog)
 
 ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Log *pLog, const std::string &sourceAddress="", short sourcePort=0)
 
virtual ~ThreadedSocketConnection ()
 
SessiongetSession () const
 
int getSocket () const
 
bool connect ()
 
void disconnect ()
 
bool read ()
 

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
 
void processStream ()
 
bool send (const std::string &)
 
bool setSession (const std::string &msg)
 
- Private Member Functions inherited from FIX::Responder
virtual ~Responder ()
 

Private Attributes

int m_socket
 
char m_buffer [BUFSIZ]
 
std::string m_address
 
int m_port
 
std::string m_sourceAddress
 
int m_sourcePort
 
Logm_pLog
 
Parser m_parser
 
Sessions m_sessions
 
Sessionm_pSession
 
bool m_disconnect
 
fd_set m_fds
 

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 61 of file ThreadedSocketConnection.h.

Member Typedef Documentation

◆ Sessions

Definition at line 64 of file ThreadedSocketConnection.h.

Constructor & Destructor Documentation

◆ ThreadedSocketConnection() [1/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( int  s,
Sessions  sessions,
Log pLog 
)

Definition at line 52 of file ThreadedSocketConnection.cpp.

54 {
55  FD_ZERO( &m_fds );
56  FD_SET( m_socket, &m_fds );
57  if ( m_pSession ) m_pSession->setResponder( this );
58 }
59 

◆ ThreadedSocketConnection() [2/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( const SessionID sessionID,
int  s,
const std::string &  address,
short  port,
Log pLog,
const std::string &  sourceAddress = "",
short  sourcePort = 0 
)

Definition at line 62 of file ThreadedSocketConnection.cpp.

63  {
66  }
67 }
68 
69 bool ThreadedSocketConnection::send( const std::string& msg )
70 {
71  int totalSent = 0;
72  while(totalSent < (int)msg.length())
73  {
74  ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
75  if(sent < 0) return false;

References FIX::Session::unregisterSession().

◆ ~ThreadedSocketConnection()

FIX::ThreadedSocketConnection::~ThreadedSocketConnection ( )
virtual

Definition at line 77 of file ThreadedSocketConnection.cpp.

83 {
84  // do the bind in the thread as name resolution may block

Member Function Documentation

◆ connect()

bool FIX::ThreadedSocketConnection::connect ( )

Definition at line 99 of file ThreadedSocketConnection.cpp.

99  { 1, 0 };
100  fd_set readset = m_fds;
101 
102  try
103  {
104  // Wait for input (1 second timeout)
105  int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
106 

◆ disconnect()

void FIX::ThreadedSocketConnection::disconnect ( )
virtual

Implements FIX::Responder.

Definition at line 108 of file ThreadedSocketConnection.cpp.

108  {
109  // We can read without blocking
110  ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
111  if ( size <= 0 ) { throw SocketRecvFailed( size ); }
112  m_parser.addToStream( m_buffer, size );

References FIX::socket_recv().

◆ getSession()

Session* FIX::ThreadedSocketConnection::getSession ( ) const
inline

Definition at line 73 of file ThreadedSocketConnection.h.

◆ getSocket()

int FIX::ThreadedSocketConnection::getSocket ( ) const
inline

Definition at line 74 of file ThreadedSocketConnection.h.

◆ processStream()

void FIX::ThreadedSocketConnection::processStream ( )
private

Definition at line 173 of file ThreadedSocketConnection.cpp.

173  {
174  disconnect();
175  return;
176  }
177  }
178  }
179 }
180 
181 bool ThreadedSocketConnection::setSession( const std::string& msg )
182 {
183  m_pSession = Session::lookupSession( msg, true );
184  if ( !m_pSession )
185  {
186  if( m_pLog )
187  {
188  m_pLog->onEvent( "Session not found for incoming message: " + msg );
189  m_pLog->onIncoming( msg );
190  }
191  return false;
192  }
193 
194  SessionID sessionID = m_pSession->getSessionID();
195  m_pSession = 0;
196 

◆ read()

bool FIX::ThreadedSocketConnection::read ( )

Definition at line 114 of file ThreadedSocketConnection.cpp.

115  {
116  m_pSession->next();
117  }
118  else if( result < 0 ) // Error
119  {
120  throw SocketRecvFailed( result );
121  }
122 
123  processStream();
124  return true;
125  }
126  catch ( SocketRecvFailed& e )
127  {
128  if( m_disconnect )
129  return false;
130 
131  if( m_pSession )
132  {
133  m_pSession->getLog()->onEvent( e.what() );
135  }
136  else
137  {
138  disconnect();
139  }
140 
141  return false;
142  }
143 }
144 
145 bool ThreadedSocketConnection::readMessage( std::string& msg )
146 throw( SocketRecvFailed )
147 {
148  try
149  {
150  return m_parser.readFixMessage( msg );
151  }
152  catch ( MessageParseError& ) {}
153  return true;
154 }
155 
157 {
158  std::string msg;
159  while( readMessage(msg) )
160  {

◆ readMessage()

bool FIX::ThreadedSocketConnection::readMessage ( std::string &  msg)
throw (SocketRecvFailed
)
private

Definition at line 162 of file ThreadedSocketConnection.cpp.

162  {
163  if ( !setSession( msg ) )
164  { disconnect(); continue; }
165  }
166  try
167  {
168  m_pSession->next( msg, UtcTimeStamp() );
169  }
170  catch( InvalidMessage& )
171  {

◆ send()

bool FIX::ThreadedSocketConnection::send ( const std::string &  msg)
privatevirtual

Implements FIX::Responder.

Definition at line 86 of file ThreadedSocketConnection.cpp.

92 {
93  m_disconnect = true;
95 }
96 

◆ setSession()

bool FIX::ThreadedSocketConnection::setSession ( const std::string &  msg)
private

Definition at line 198 of file ThreadedSocketConnection.cpp.

199  {
200  if( !Session::isSessionRegistered( sessionID ) )
201  m_pSession = Session::registerSession( sessionID );
202  if( m_pSession ) break;
203  process_sleep( 1 );
204  }
205 
206  if ( !m_pSession )
207  return false;
208  if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
209  return false;
210 
211  m_pSession->setResponder( this );
212  return true;
213 }
214 
215 } // namespace FIX

References FIX::Session::isSessionRegistered(), FIX::process_sleep(), and FIX::Session::registerSession().

Member Data Documentation

◆ m_address

std::string FIX::ThreadedSocketConnection::m_address
private

Definition at line 88 of file ThreadedSocketConnection.h.

◆ m_buffer

char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ]
private

Definition at line 86 of file ThreadedSocketConnection.h.

◆ m_disconnect

bool FIX::ThreadedSocketConnection::m_disconnect
private

Definition at line 97 of file ThreadedSocketConnection.h.

◆ m_fds

fd_set FIX::ThreadedSocketConnection::m_fds
private

Definition at line 98 of file ThreadedSocketConnection.h.

◆ m_parser

Parser FIX::ThreadedSocketConnection::m_parser
private

Definition at line 94 of file ThreadedSocketConnection.h.

◆ m_pLog

Log* FIX::ThreadedSocketConnection::m_pLog
private

Definition at line 93 of file ThreadedSocketConnection.h.

◆ m_port

int FIX::ThreadedSocketConnection::m_port
private

Definition at line 89 of file ThreadedSocketConnection.h.

◆ m_pSession

Session* FIX::ThreadedSocketConnection::m_pSession
private

Definition at line 96 of file ThreadedSocketConnection.h.

◆ m_sessions

Sessions FIX::ThreadedSocketConnection::m_sessions
private

Definition at line 95 of file ThreadedSocketConnection.h.

◆ m_socket

int FIX::ThreadedSocketConnection::m_socket
private

Definition at line 85 of file ThreadedSocketConnection.h.

◆ m_sourceAddress

std::string FIX::ThreadedSocketConnection::m_sourceAddress
private

Definition at line 90 of file ThreadedSocketConnection.h.

◆ m_sourcePort

int FIX::ThreadedSocketConnection::m_sourcePort
private

Definition at line 91 of file ThreadedSocketConnection.h.


The documentation for this class was generated from the following files:
FIX::ThreadedSocketConnection::readMessage
bool readMessage(std::string &msg)
Definition: ThreadedSocketConnection.cpp:162
FIX::ThreadedSocketConnection::setSession
bool setSession(const std::string &msg)
Definition: ThreadedSocketConnection.cpp:198
FIX::Session::lookupSession
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1513
FIX::Session::unregisterSession
static void unregisterSession(const SessionID &)
Definition: Session.cpp:1564
FIX::ThreadedSocketConnection::send
bool send(const std::string &)
Definition: ThreadedSocketConnection.cpp:86
FIX::Session::getSessionID
const SessionID & getSessionID() const
Definition: Session.h:109
FIX::Session::registerSession
static Session * registerSession(const SessionID &)
Definition: Session.cpp:1554
FIX::ThreadedSocketConnection::processStream
void processStream()
Definition: ThreadedSocketConnection.cpp:173
FIX::ThreadedSocketConnection::disconnect
void disconnect()
Definition: ThreadedSocketConnection.cpp:108
FIX::Session::next
void next()
Definition: Session.cpp:142
FIX::socket_recv
ssize_t socket_recv(int s, char *buf, size_t length)
Definition: Utility.cpp:187
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
FIX::ThreadedSocketConnection::m_pSession
Session * m_pSession
Definition: ThreadedSocketConnection.h:96
FIX::Log::onEvent
virtual void onEvent(const std::string &)=0
FIX::Session::getLog
Log * getLog()
Definition: Session.h:261
FIX::Session::isSessionRegistered
static bool isSessionRegistered(const SessionID &)
Definition: Session.cpp:1548
FIX::socket_send
ssize_t socket_send(int s, const char *msg, size_t length)
Definition: Utility.cpp:192
FIX::ThreadedSocketConnection::m_fds
fd_set m_fds
Definition: ThreadedSocketConnection.h:98
FIX::ThreadedSocketConnection::read
bool read()
Definition: ThreadedSocketConnection.cpp:114
FIX::ThreadedSocketConnection::m_disconnect
bool m_disconnect
Definition: ThreadedSocketConnection.h:97
FIX::process_sleep
void process_sleep(double s)
Definition: Utility.cpp:483
FIX::ThreadedSocketConnection::m_parser
Parser m_parser
Definition: ThreadedSocketConnection.h:94
FIX::Parser::addToStream
void addToStream(const char *str, size_t len)
Definition: Parser.h:82
FIX::Session::disconnect
void disconnect()
Definition: Session.cpp:630
FIX::Parser::readFixMessage
bool readFixMessage(std::string &str)
Definition: Parser.cpp:76
FIX::ThreadedSocketConnection::m_socket
int m_socket
Definition: ThreadedSocketConnection.h:85
FIX::socket_close
void socket_close(int s)
Definition: Utility.cpp:197
FIX::ThreadedSocketConnection::m_buffer
char m_buffer[BUFSIZ]
Definition: ThreadedSocketConnection.h:86
FIX::Session::setResponder
void setResponder(Responder *pR)
Definition: Session.h:244
FIX::ThreadedSocketConnection::m_pLog
Log * m_pLog
Definition: ThreadedSocketConnection.h:93
FIX::ThreadedSocketConnection::m_sessions
Sessions m_sessions
Definition: ThreadedSocketConnection.h:95
FIX::Log::onIncoming
virtual void onIncoming(const std::string &)=0

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001