Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

FIX::SocketMonitor Class Reference

Monitors events on a collection of sockets. More...

#include <SocketMonitor.h>

List of all members.

Classes

class  Strategy

Public Member Functions

 SocketMonitor (int timeout=0)
virtual ~SocketMonitor ()
bool addConnect (int socket)
bool addRead (int socket)
bool addWrite (int socket)
bool drop (int socket)
void signal (int socket)
void unsignal (int socket)
void block (Strategy &strategy, bool poll=0, double timeout=0.0)
int numSockets ()

Private Types

typedef std::set< int > Sockets
typedef std::queue< int > Queue

Private Member Functions

void setsockopt ()
bool bind ()
bool listen ()
void buildSet (const Sockets &, fd_set &)
timeval * getTimeval (bool poll, double timeout)
bool sleepIfEmpty (bool poll)
void processReadSet (Strategy &, fd_set &)
void processWriteSet (Strategy &, fd_set &)
void processExceptSet (Strategy &, fd_set &)

Private Attributes

int m_timeout
timeval m_timeval
clock_t m_ticks
int m_signal
int m_interrupt
Sockets m_connectSockets
Sockets m_readSockets
Sockets m_writeSockets
Queue m_dropped

Detailed Description

Monitors events on a collection of sockets.

Definition at line 47 of file SocketMonitor.h.


Member Typedef Documentation

typedef std::queue< int > FIX::SocketMonitor::Queue [private]

Definition at line 68 of file SocketMonitor.h.

typedef std::set< int > FIX::SocketMonitor::Sockets [private]

Definition at line 67 of file SocketMonitor.h.


Constructor & Destructor Documentation

FIX::SocketMonitor::SocketMonitor ( int  timeout = 0  ) 

Definition at line 36 of file SocketMonitor.cpp.

References m_interrupt, m_readSockets, m_signal, m_ticks, m_timeval, FIX::socket_createpair(), FIX::socket_init(), and FIX::socket_setnonblock().

00037 : m_timeout( timeout )
00038 {
00039   socket_init();
00040 
00041   std::pair<int, int> sockets = socket_createpair();
00042   m_signal = sockets.first;
00043   m_interrupt = sockets.second;
00044   socket_setnonblock( m_signal );
00045   socket_setnonblock( m_interrupt );
00046   m_readSockets.insert( m_interrupt );
00047 
00048   m_timeval.tv_sec = 0;
00049   m_timeval.tv_usec = 0;
00050 #ifndef SELECT_DECREMENTS_TIME
00051   m_ticks = clock();
00052 #endif
00053 }

FIX::SocketMonitor::~SocketMonitor (  )  [virtual]

Definition at line 55 of file SocketMonitor.cpp.

References m_readSockets, m_signal, FIX::socket_close(), and FIX::socket_term().

00056 {
00057   Sockets::iterator i;
00058   for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i ) {
00059     socket_close( *i );
00060   }
00061 
00062   socket_close( m_signal );
00063   socket_term();
00064 }


Member Function Documentation

bool FIX::SocketMonitor::addConnect ( int  socket  ) 

Definition at line 66 of file SocketMonitor.cpp.

References m_connectSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().

Referenced by FIX::SocketServer::accept(), and FIX::SocketConnector::connect().

00067 { QF_STACK_PUSH(SocketMonitor::addConnect)
00068 
00069   socket_setnonblock( s );
00070   Sockets::iterator i = m_connectSockets.find( s );
00071   if( i != m_connectSockets.end() ) return false;
00072 
00073   m_connectSockets.insert( s );
00074   return true;
00075 
00076   QF_STACK_POP
00077 }

bool FIX::SocketMonitor::addRead ( int  socket  ) 

Definition at line 79 of file SocketMonitor.cpp.

References m_readSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().

00080 { QF_STACK_PUSH(SocketMonitor::addRead)
00081 
00082   socket_setnonblock( s );
00083   Sockets::iterator i = m_readSockets.find( s );
00084   if( i != m_readSockets.end() ) return false;
00085 
00086   m_readSockets.insert( s );
00087   return true;
00088 
00089   QF_STACK_POP
00090 }

bool FIX::SocketMonitor::addWrite ( int  socket  ) 

Definition at line 92 of file SocketMonitor.cpp.

References m_readSockets, m_writeSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().

Referenced by processReadSet().

00093 { QF_STACK_PUSH(SocketMonitor::addWrite)
00094 
00095   if( m_readSockets.find(s) == m_readSockets.end() )
00096     return false;
00097 
00098   socket_setnonblock( s );
00099   Sockets::iterator i = m_writeSockets.find( s );
00100   if( i != m_writeSockets.end() ) return false;
00101 
00102   m_writeSockets.insert( s );
00103   return true;
00104 
00105   QF_STACK_POP
00106 }

bool FIX::SocketMonitor::bind (  )  [private]
void FIX::SocketMonitor::block ( Strategy strategy,
bool  poll = 0,
double  timeout = 0.0 
)

Definition at line 204 of file SocketMonitor.cpp.

References buildSet(), getTimeval(), m_connectSockets, m_dropped, m_readSockets, m_writeSockets, FIX::SocketMonitor::Strategy::onError(), FIX::SocketMonitor::Strategy::onTimeout(), FIX::Queue< T >::pop(), processExceptSet(), processReadSet(), processWriteSet(), QF_STACK_POP, QF_STACK_PUSH, FIX::Queue< T >::size(), and sleepIfEmpty().

Referenced by FIX::SocketServer::block(), and FIX::SocketConnector::block().

00205 { QF_STACK_PUSH(SocketMonitor::block)
00206 
00207   while ( m_dropped.size() )
00208   {
00209     strategy.onError( *this, m_dropped.front() );
00210     m_dropped.pop();
00211     if ( m_dropped.size() == 0 )
00212       return ;
00213   }
00214 
00215   fd_set readSet;
00216   FD_ZERO( &readSet );
00217   buildSet( m_readSockets, readSet );
00218   fd_set writeSet;
00219   FD_ZERO( &writeSet );
00220   buildSet( m_connectSockets, writeSet );
00221   buildSet( m_writeSockets, writeSet );
00222   fd_set exceptSet;
00223   FD_ZERO( &exceptSet );
00224   buildSet( m_connectSockets, exceptSet );
00225 
00226   if ( sleepIfEmpty(poll) )
00227   {
00228     strategy.onTimeout( *this );
00229     return;
00230   }
00231 
00232   int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) );
00233 
00234   if ( result == 0 )
00235   {
00236     strategy.onTimeout( *this );
00237     return;
00238   }
00239   else if ( result > 0 )
00240   {
00241     processExceptSet( strategy, exceptSet );
00242     processWriteSet( strategy, writeSet );
00243     processReadSet( strategy, readSet );
00244   }
00245   else
00246   {
00247     strategy.onError( *this );
00248   }
00249 
00250   QF_STACK_POP
00251 }

void FIX::SocketMonitor::buildSet ( const Sockets sockets,
fd_set &  watchSet 
) [private]

Definition at line 363 of file SocketMonitor.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00364 { QF_STACK_PUSH(SocketMonitor::buildSet)
00365 
00366   Sockets::const_iterator iter;
00367   for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) {
00368     FD_SET( *iter, &watchSet );
00369   }
00370   QF_STACK_POP
00371 }

bool FIX::SocketMonitor::drop ( int  socket  ) 

Definition at line 108 of file SocketMonitor.cpp.

References m_connectSockets, m_dropped, m_readSockets, m_writeSockets, FIX::Queue< T >::push(), QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_close().

Referenced by FIX::SocketConnection::disconnect(), FIX::HttpServer::onConnect(), FIX::ServerWrapper::onError(), FIX::SocketConnection::read(), and FIX::SocketConnection::readMessages().

00109 { QF_STACK_PUSH(SocketMonitor::drop)
00110 
00111   Sockets::iterator i = m_readSockets.find( s );
00112   Sockets::iterator j = m_writeSockets.find( s );
00113   Sockets::iterator k = m_connectSockets.find( s );
00114 
00115   if ( i != m_readSockets.end() || 
00116        j != m_writeSockets.end() ||
00117        k != m_connectSockets.end() )
00118   {
00119     socket_close( s );
00120     m_readSockets.erase( s );
00121     m_writeSockets.erase( s );
00122     m_connectSockets.erase( s );
00123     m_dropped.push( s );
00124     return true;
00125   }
00126   return false;
00127 
00128   QF_STACK_POP
00129 }

timeval * FIX::SocketMonitor::getTimeval ( bool  poll,
double  timeout 
) [inline, private]

Definition at line 131 of file SocketMonitor.cpp.

References m_ticks, m_timeout, m_timeval, QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00132 { QF_STACK_PUSH(SocketMonitor::getTimeval)
00133 
00134   if ( poll )
00135   {
00136     m_timeval.tv_sec = 0;
00137     m_timeval.tv_usec = 0;
00138     return &m_timeval;
00139   }
00140 
00141   timeout = m_timeout;
00142 
00143   if ( !timeout )
00144     return 0;
00145 #ifdef SELECT_MODIFIES_TIMEVAL
00146   if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout )
00147     m_timeval.tv_sec = timeout;
00148   return &m_timeval;
00149 #else
00150 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC;
00151   if ( elapsed >= timeout || elapsed == 0.0 )
00152   {
00153     m_ticks = clock();
00154     m_timeval.tv_sec = 0;
00155     m_timeval.tv_usec = (long)(timeout * 1000000);
00156   }
00157   else
00158   {
00159     m_timeval.tv_sec = 0;
00160     m_timeval.tv_usec = ( long ) ( ( timeout - elapsed ) * 1000000 );
00161   }
00162   return &m_timeval;
00163 #endif
00164 
00165   QF_STACK_POP
00166 }

bool FIX::SocketMonitor::listen (  )  [private]
int FIX::SocketMonitor::numSockets (  )  [inline]

Definition at line 63 of file SocketMonitor.h.

References m_readSockets.

Referenced by FIX::SocketServer::numConnections().

00064   { return m_readSockets.size() - 1; }

void FIX::SocketMonitor::processExceptSet ( Strategy strategy,
fd_set &  exceptSet 
) [private]

Definition at line 339 of file SocketMonitor.cpp.

References m_connectSockets, FIX::SocketMonitor::Strategy::onError(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00340 { QF_STACK_PUSH(SocketMonitor::processExceptSet)
00341 
00342 #ifdef _MSC_VER
00343   for ( unsigned i = 0; i < exceptSet.fd_count; ++i )
00344   {
00345     int s = exceptSet.fd_array[ i ];
00346     strategy.onError( *this, s );
00347   }
00348 #else
00349     Sockets::iterator i;
00350     Sockets sockets = m_connectSockets;
00351     for ( i = sockets.begin(); i != sockets.end(); ++i )
00352     {
00353       int s = *i;
00354       if ( !FD_ISSET( *i, &exceptSet ) )
00355         continue;
00356       strategy.onError( *this, s );
00357     }
00358 #endif
00359 
00360   QF_STACK_POP
00361 }

void FIX::SocketMonitor::processReadSet ( Strategy strategy,
fd_set &  readSet 
) [private]

Definition at line 253 of file SocketMonitor.cpp.

References addWrite(), m_interrupt, m_readSockets, FIX::SocketMonitor::Strategy::onEvent(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00254 { QF_STACK_PUSH(SocketMonitor::processReadSet)
00255 
00256 #ifdef _MSC_VER
00257   for ( unsigned i = 0; i < readSet.fd_count; ++i )
00258   {
00259     int s = readSet.fd_array[ i ];
00260     if( s == m_interrupt )
00261     {
00262       int socket = 0;
00263       recv( s, (char*)&socket, sizeof(socket), 0 );
00264       addWrite( socket );
00265     }
00266     else
00267     {
00268       strategy.onEvent( *this, s );
00269     }
00270   }
00271 #else
00272     Sockets::iterator i;
00273     Sockets sockets = m_readSockets;
00274     for ( i = sockets.begin(); i != sockets.end(); ++i )
00275     {
00276       int s = *i;
00277       if ( !FD_ISSET( *i, &readSet ) )
00278         continue;
00279       if( s == m_interrupt )
00280       {
00281         int socket = 0;
00282         recv( s, (char*)&socket, sizeof(socket), 0 );
00283         addWrite( socket );
00284       }
00285       else
00286       {
00287         strategy.onEvent( *this, s );
00288       }
00289     }
00290 #endif
00291 
00292   QF_STACK_POP
00293 }

void FIX::SocketMonitor::processWriteSet ( Strategy strategy,
fd_set &  writeSet 
) [private]

Definition at line 295 of file SocketMonitor.cpp.

References m_connectSockets, m_readSockets, m_writeSockets, FIX::SocketMonitor::Strategy::onConnect(), FIX::SocketMonitor::Strategy::onWrite(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00296 { QF_STACK_PUSH(SocketMonitor::processWriteSet)
00297 
00298 #ifdef _MSC_VER
00299   for ( unsigned i = 0; i < writeSet.fd_count; ++i )
00300   {
00301     int s = writeSet.fd_array[ i ];
00302     if( m_connectSockets.find(s) != m_connectSockets.end() )
00303     {
00304       m_connectSockets.erase( s );
00305       m_readSockets.insert( s );
00306       strategy.onConnect( *this, s );
00307     }
00308     else
00309     {
00310       strategy.onWrite( *this, s );
00311     }
00312   }
00313 #else
00314   Sockets::iterator i;
00315   Sockets sockets = m_connectSockets;
00316   for( i = sockets.begin(); i != sockets.end(); ++i )
00317   {
00318     int s = *i;
00319     if ( !FD_ISSET( *i, &writeSet ) )
00320       continue;
00321     m_connectSockets.erase( s );
00322     m_readSockets.insert( s );
00323     strategy.onConnect( *this, s );
00324   }
00325 
00326   sockets = m_writeSockets;
00327   for( i = sockets.begin(); i != sockets.end(); ++i )
00328   {
00329     int s = *i;
00330     if ( !FD_ISSET( *i, &writeSet ) )
00331       continue;
00332     strategy.onWrite( *this, s );
00333   }
00334 #endif
00335 
00336   QF_STACK_POP
00337 }

void FIX::SocketMonitor::setsockopt (  )  [private]
void FIX::SocketMonitor::signal ( int  socket  ) 

Definition at line 187 of file SocketMonitor.cpp.

References m_signal, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_send().

Referenced by FIX::SocketConnection::signal().

00188 { QF_STACK_PUSH(SocketMonitor::signal)
00189   socket_send( m_signal, (char*)&socket, sizeof(socket) );
00190   QF_STACK_POP
00191 }

bool FIX::SocketMonitor::sleepIfEmpty ( bool  poll  )  [inline, private]

Definition at line 168 of file SocketMonitor.cpp.

References m_connectSockets, m_readSockets, m_timeout, m_writeSockets, FIX::process_sleep(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by block().

00169 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty)
00170 
00171   if( poll )
00172     return false;
00173 
00174   if ( m_readSockets.empty() && 
00175        m_writeSockets.empty() &&
00176        m_connectSockets.empty() )
00177   {
00178     process_sleep( m_timeout );
00179     return true;
00180   }
00181   else
00182     return false;
00183 
00184   QF_STACK_POP
00185 }

void FIX::SocketMonitor::unsignal ( int  socket  ) 

Definition at line 193 of file SocketMonitor.cpp.

References m_writeSockets, QF_STACK_POP, and QF_STACK_PUSH.

Referenced by FIX::SocketConnection::unsignal().

00194 { QF_STACK_PUSH(SocketMonitor::unsignal)
00195 
00196   Sockets::iterator i = m_writeSockets.find( s );
00197   if( i == m_writeSockets.end() ) return;
00198 
00199   m_writeSockets.erase( s );
00200 
00201   QF_STACK_POP
00202 }


Member Data Documentation

Definition at line 89 of file SocketMonitor.h.

Referenced by addConnect(), block(), drop(), processExceptSet(), processWriteSet(), and sleepIfEmpty().

Definition at line 92 of file SocketMonitor.h.

Referenced by block(), and drop().

Definition at line 88 of file SocketMonitor.h.

Referenced by processReadSet(), and SocketMonitor().

Definition at line 87 of file SocketMonitor.h.

Referenced by signal(), SocketMonitor(), and ~SocketMonitor().

clock_t FIX::SocketMonitor::m_ticks [private]

Definition at line 84 of file SocketMonitor.h.

Referenced by getTimeval(), and SocketMonitor().

Definition at line 81 of file SocketMonitor.h.

Referenced by getTimeval(), and sleepIfEmpty().

timeval FIX::SocketMonitor::m_timeval [private]

Definition at line 82 of file SocketMonitor.h.

Referenced by getTimeval(), and SocketMonitor().

Definition at line 91 of file SocketMonitor.h.

Referenced by addWrite(), block(), drop(), processWriteSet(), sleepIfEmpty(), and unsignal().


The documentation for this class was generated from the following files:

Generated on Mon Apr 5 21:00:11 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001