Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

SocketInitiator.cpp

Go to the documentation of this file.
00001 /****************************************************************************
00002 ** Copyright (c) quickfixengine.org  All rights reserved.
00003 **
00004 ** This file is part of the QuickFIX FIX Engine
00005 **
00006 ** This file may be distributed under the terms of the quickfixengine.org
00007 ** license as defined by quickfixengine.org and appearing in the file
00008 ** LICENSE included in the packaging of this file.
00009 **
00010 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
00011 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
00012 **
00013 ** See http://www.quickfixengine.org/LICENSE for licensing information.
00014 **
00015 ** Contact ask@quickfixengine.org if any conditions of this licensing are
00016 ** not clear to you.
00017 **
00018 ****************************************************************************/
00019 
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026 
00027 #include "SocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030 
00031 namespace FIX
00032 {
00033 SocketInitiator::SocketInitiator( Application& application,
00034                                   MessageStoreFactory& factory,
00035                                   const SessionSettings& settings )
00036 throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038   m_connector( 1 ), m_lastConnect( 0 ),
00039   m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00040   m_rcvBufSize( 0 ) 
00041 {
00042 }
00043 
00044 SocketInitiator::SocketInitiator( Application& application,
00045                                   MessageStoreFactory& factory,
00046                                   const SessionSettings& settings,
00047                                   LogFactory& logFactory )
00048 throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050   m_connector( 1 ), m_lastConnect( 0 ),
00051   m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00052   m_rcvBufSize( 0 )
00053 {
00054 }
00055 
00056 SocketInitiator::~SocketInitiator()
00057 {
00058   SocketConnections::iterator i;
00059   for (i = m_connections.begin();
00060        i != m_connections.end(); ++i)
00061     delete i->second;
00062 
00063   for (i = m_pendingConnections.begin();
00064        i != m_pendingConnections.end(); ++i)
00065     delete i->second;
00066 }
00067 
00068 void SocketInitiator::onConfigure( const SessionSettings& s )
00069 throw ( ConfigError )
00070 { QF_STACK_PUSH(SocketInitiator::onConfigure)
00071 
00072   try { m_reconnectInterval = s.get().getLong( RECONNECT_INTERVAL ); }
00073   catch ( std::exception& ) {}
00074   if( s.get().has( SOCKET_NODELAY ) )
00075     m_noDelay = s.get().getBool( SOCKET_NODELAY );
00076   if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00077     m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00078   if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00079     m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00080 
00081   QF_STACK_POP
00082 }
00083 
00084 void SocketInitiator::onInitialize( const SessionSettings& s )
00085 throw ( RuntimeError )
00086 { QF_STACK_PUSH(SocketInitiator::onInitialize)
00087   QF_STACK_POP
00088 }
00089 
00090 void SocketInitiator::onStart()
00091 { QF_STACK_PUSH(SocketInitiator::onStart)
00092 
00093   connect();
00094 
00095   while ( !isStopped() )
00096     m_connector.block( *this );
00097 
00098   time_t start = 0;
00099   time_t now = 0;
00100 
00101   ::time( &start );
00102   while ( isLoggedOn() )
00103   {
00104     m_connector.block( *this );
00105     if( ::time(&now) -5 >= start )
00106       break;
00107   }
00108 
00109   QF_STACK_POP
00110 }
00111 
00112 bool SocketInitiator::onPoll( double timeout )
00113 { QF_STACK_PUSH(SocketInitiator::onPoll)
00114 
00115   time_t start = 0;
00116   time_t now = 0;
00117 
00118   if( isStopped() )
00119   {
00120     if( start == 0 )
00121       ::time( &start );
00122     if( !isLoggedOn() )
00123       return false;
00124     if( ::time(&now) - 5 >= start )
00125       return false;
00126   }
00127 
00128   m_connector.block( *this, true, timeout );
00129   return true;
00130 
00131   QF_STACK_POP
00132 }
00133 
00134 void SocketInitiator::onStop()
00135 { QF_STACK_PUSH(SocketInitiator::onStop)
00136   QF_STACK_POP
00137 }
00138 
00139 void SocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00140 { QF_STACK_PUSH(SocketInitiator::doConnect)
00141 
00142   try
00143   {
00144     std::string address;
00145     short port = 0;
00146     Session* session = Session::lookupSession( s );
00147     if( !session->isSessionTime(UtcTimeStamp()) ) return;
00148 
00149     Log* log = session->getLog();
00150 
00151     getHost( s, d, address, port );
00152 
00153     log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00154     int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize );
00155 
00156     if( result != -1 )
00157     {
00158       setPending( s );
00159 
00160       m_pendingConnections[ result ] 
00161         = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
00162     }
00163   }
00164   catch ( std::exception& ) {}
00165 
00166   QF_STACK_POP
00167 }
00168 
00169 void SocketInitiator::onConnect( SocketConnector&, int s )
00170 { QF_STACK_PUSH(SocketInitiator::onConnect)
00171 
00172   SocketConnections::iterator i = m_pendingConnections.find( s );
00173   if( i == m_pendingConnections.end() ) return;
00174   SocketConnection* pSocketConnection = i->second;
00175   
00176   m_connections[s] = pSocketConnection;
00177   m_pendingConnections.erase( i );
00178   setConnected( pSocketConnection->getSession()->getSessionID() );
00179   pSocketConnection->onTimeout();
00180 
00181   QF_STACK_POP
00182 }
00183 
00184 void SocketInitiator::onWrite( SocketConnector& connector, int s )
00185 { QF_STACK_PUSH(SocketInitiator::onWrite)
00186 
00187   SocketConnections::iterator i = m_connections.find( s );
00188   if ( i == m_connections.end() ) return ;
00189   SocketConnection* pSocketConnection = i->second;
00190   if( pSocketConnection->processQueue() )
00191     pSocketConnection->unsignal();
00192   
00193   QF_STACK_POP
00194 }
00195 
00196 bool SocketInitiator::onData( SocketConnector& connector, int s )
00197 { QF_STACK_PUSH(SocketInitiator::onData)
00198 
00199   SocketConnections::iterator i = m_connections.find( s );
00200   if ( i == m_connections.end() ) return false;
00201   SocketConnection* pSocketConnection = i->second;
00202   return pSocketConnection->read( connector );
00203 
00204   QF_STACK_POP
00205 }
00206 
00207 void SocketInitiator::onDisconnect( SocketConnector&, int s )
00208 { QF_STACK_PUSH(SocketInitiator::onDisconnect)
00209 
00210   SocketConnections::iterator i = m_connections.find( s );
00211   SocketConnections::iterator j = m_pendingConnections.find( s );
00212 
00213   SocketConnection* pSocketConnection = 0;
00214   if( i != m_connections.end() ) 
00215           pSocketConnection = i->second;
00216   if( j != m_pendingConnections.end() )
00217           pSocketConnection = j->second;
00218   if( !pSocketConnection )
00219           return;
00220 
00221   setDisconnected( pSocketConnection->getSession()->getSessionID() );
00222 
00223   Session* pSession = pSocketConnection->getSession();
00224   if ( pSession )
00225   {
00226     pSession->disconnect();
00227     setDisconnected( pSession->getSessionID() );
00228   }
00229 
00230   delete pSocketConnection;
00231   m_connections.erase( s );
00232   m_pendingConnections.erase( s );
00233 
00234   QF_STACK_POP
00235 }
00236 
00237 void SocketInitiator::onError( SocketConnector& connector )
00238 { QF_STACK_PUSH(SocketInitiator::onError)
00239   onTimeout( connector );
00240   QF_STACK_POP
00241 }
00242 
00243 void SocketInitiator::onTimeout( SocketConnector& )
00244 { QF_STACK_PUSH(SocketInitiator::onTimeout)
00245 
00246   time_t now;
00247   ::time( &now );
00248 
00249   if ( (now - m_lastConnect) >= m_reconnectInterval )
00250   {
00251     connect();
00252     m_lastConnect = now;
00253   }
00254 
00255   SocketConnections::iterator i;
00256   for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00257     i->second->onTimeout();
00258 
00259   QF_STACK_POP
00260 }
00261 
00262 void SocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00263                                std::string& address, short& port )
00264 { QF_STACK_PUSH(SocketInitiator::getHost)
00265 
00266   int num = 0;
00267   SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00268   if ( i != m_sessionToHostNum.end() ) num = i->second;
00269 
00270   std::stringstream hostStream;
00271   hostStream << SOCKET_CONNECT_HOST << num;
00272   std::string hostString = hostStream.str();
00273 
00274   std::stringstream portStream;
00275   std::string portString = portStream.str();
00276   portStream << SOCKET_CONNECT_PORT << num;
00277 
00278   if( d.has(hostString) && d.has(portString) )
00279   {
00280     address = d.getString( hostString );
00281     port = ( short ) d.getLong( portString );
00282   }
00283   else
00284   {
00285     num = 0;
00286     address = d.getString( SOCKET_CONNECT_HOST );
00287     port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00288   }
00289 
00290   m_sessionToHostNum[ s ] = ++num;
00291 
00292   QF_STACK_POP
00293 }
00294 }

Generated on Mon Apr 5 20:59:51 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001