Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

ThreadedSocketConnection.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "ThreadedSocketConnection.h"
00028 #include "ThreadedSocketAcceptor.h"
00029 #include "ThreadedSocketInitiator.h"
00030 #include "Session.h"
00031 #include "Utility.h"
00032 
00033 namespace FIX
00034 {
00035 ThreadedSocketConnection::ThreadedSocketConnection
00036 ( int s, Sessions sessions, Application& application, Log* pLog )
00037 : m_socket( s ), m_application( application ), m_pLog( pLog ),
00038   m_sessions( sessions ), m_pSession( 0 ),
00039   m_disconnect( false )
00040 {
00041   FD_ZERO( &m_fds );
00042   FD_SET( m_socket, &m_fds );
00043 }
00044 
00045 ThreadedSocketConnection::ThreadedSocketConnection
00046 ( const SessionID& sessionID, int s,
00047   const std::string& address, short port, 
00048   Application& application, Log* pLog )
00049   : m_socket( s ), m_address( address ), m_port( port ),
00050     m_application( application ), m_pLog( m_pLog ),
00051     m_pSession( Session::lookupSession( sessionID ) ),
00052     m_disconnect( false )
00053 {
00054   FD_ZERO( &m_fds );
00055   FD_SET( m_socket, &m_fds );
00056   if ( m_pSession ) m_pSession->setResponder( this );
00057 }
00058 
00059 ThreadedSocketConnection::~ThreadedSocketConnection()
00060 {
00061   if ( m_pSession )
00062   {
00063     m_pSession->setResponder( 0 );
00064     Session::unregisterSession( m_pSession->getSessionID() );
00065   }
00066 }
00067 
00068 bool ThreadedSocketConnection::send( const std::string& msg )
00069 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00070   return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00071   QF_STACK_POP
00072 }
00073 
00074 bool ThreadedSocketConnection::connect()
00075 { QF_STACK_PUSH(ThreadedSocketConnection::connect)
00076   return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
00077   QF_STACK_POP
00078 }
00079 
00080 void ThreadedSocketConnection::disconnect()
00081 { QF_STACK_PUSH(ThreadedSocketConnection::disconnect)
00082   
00083   m_disconnect = true;
00084   socket_close( m_socket );
00085 
00086   QF_STACK_POP
00087 }
00088 
00089 bool ThreadedSocketConnection::read()
00090 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00091 
00092   struct timeval timeout = { 1, 0 };
00093   fd_set readset = m_fds;
00094 
00095   try
00096   {
00097     // Wait for input (1 second timeout)
00098     int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00099 
00100     if( result > 0 ) // Something to read
00101     {
00102       // We can read without blocking
00103       int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00104       if ( size <= 0 ) { throw SocketRecvFailed( size ); }
00105       m_parser.addToStream( m_buffer, size );
00106     }
00107     else if( result == 0 && m_pSession ) // Timeout
00108     {
00109       m_pSession->next();
00110     }
00111     else if( result < 0 ) // Error
00112     {
00113       throw SocketRecvFailed( result );
00114     }
00115 
00116     processStream();
00117     return true;
00118   }
00119   catch ( SocketRecvFailed& e )
00120   {
00121     if( m_disconnect )
00122       return false;
00123 
00124     if( m_pSession )
00125     {
00126       m_pSession->getLog()->onEvent( e.what() );
00127       m_pSession->disconnect();
00128     }
00129     else
00130     {
00131       disconnect();
00132     }
00133 
00134     return false;
00135   }
00136 
00137   QF_STACK_POP
00138 }
00139 
00140 bool ThreadedSocketConnection::readMessage( std::string& msg )
00141 throw( SocketRecvFailed )
00142 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00143 
00144   try
00145   {
00146     return m_parser.readFixMessage( msg );
00147   }
00148   catch ( MessageParseError& ) {}
00149   return true;
00150 
00151   QF_STACK_POP
00152 }
00153 
00154 void ThreadedSocketConnection::processStream()
00155 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00156 
00157   std::string msg;
00158   while( readMessage(msg) )
00159   {
00160     if ( !m_pSession )
00161     {
00162       if ( !setSession( msg ) )
00163       { disconnect(); continue; }
00164     }
00165     try
00166     {
00167       m_pSession->next( msg, UtcTimeStamp() );
00168     }
00169     catch( InvalidMessage& )
00170     {
00171       if( !m_pSession->isLoggedOn() )
00172       {
00173         disconnect();
00174         return;
00175       }
00176     }
00177   }
00178 
00179   QF_STACK_POP
00180 }
00181 
00182 bool ThreadedSocketConnection::setSession( const std::string& msg )
00183 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00184 
00185   m_pSession = Session::lookupSession( msg, true );
00186   if ( !m_pSession ) 
00187   {
00188     if( m_pLog )
00189     {
00190       m_pLog->onEvent( "Session not found for incoming message: " + msg );
00191       m_pLog->onIncoming( msg );
00192     }
00193     return false;
00194   }
00195 
00196   SessionID sessionID = m_pSession->getSessionID();
00197   m_pSession = 0;
00198 
00199   // see if the session frees up within 5 seconds
00200   for( int i = 1; i <= 5; i++ )
00201   {
00202     if( !Session::isSessionRegistered( sessionID ) )
00203       m_pSession = Session::registerSession( sessionID );
00204     if( m_pSession ) break;
00205     process_sleep( 1 );
00206   }
00207 
00208   if ( !m_pSession ) 
00209     return false;
00210   if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00211     return false;
00212 
00213   m_pSession->setResponder( this );
00214   return true;
00215 
00216   QF_STACK_POP
00217 }
00218 
00219 } // namespace FIX

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001