MySQLStore.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #ifdef HAVE_MYSQL
27 
28 #include "MySQLStore.h"
29 #include "SessionID.h"
30 #include "SessionSettings.h"
31 #include "FieldConvertors.h"
32 #include "Parser.h"
33 #include "Utility.h"
34 #include "strptime.h"
35 #include <fstream>
36 
37 namespace FIX
38 {
39 
40 const std::string MySQLStoreFactory::DEFAULT_DATABASE = "quickfix";
41 const std::string MySQLStoreFactory::DEFAULT_USER = "";
42 const std::string MySQLStoreFactory::DEFAULT_PASSWORD = "";
43 const std::string MySQLStoreFactory::DEFAULT_HOST = "localhost";
44 const short MySQLStoreFactory::DEFAULT_PORT = 3306;
45 
46 MySQLStore::MySQLStore
47 ( const SessionID& s, const DatabaseConnectionID& d, MySQLConnectionPool* p )
48  : m_pConnectionPool( p ), m_sessionID( s )
49 {
50  m_pConnection = m_pConnectionPool->create( d );
51  populateCache();
52 }
53 
54 MySQLStore::MySQLStore
55 ( const SessionID& s, const std::string& database, const std::string& user,
56  const std::string& password, const std::string& host, short port )
57  : m_pConnectionPool( 0 ), m_sessionID( s )
58 {
59  m_pConnection = new MySQLConnection( database, user, password, host, port );
60  populateCache();
61 }
62 
63 MySQLStore::~MySQLStore()
64 {
65  if( m_pConnectionPool )
66  m_pConnectionPool->destroy( m_pConnection );
67  else
68  delete m_pConnection;
69 }
70 
71 void MySQLStore::populateCache()
72 {
73  std::stringstream queryString;
74 
75  queryString << "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM sessions WHERE "
76  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
77  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
78  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
79  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
80 
81  MySQLQuery query( queryString.str() );
82  if( !m_pConnection->execute(query) )
83  throw ConfigError( "No entries found for session in database" );
84 
85  int rows = query.rows();
86  if( rows > 1 )
87  throw ConfigError( "Multiple entries found for session in database" );
88 
89  if( rows == 1 )
90  {
91  struct tm time;
92  std::string sqlTime = query.getValue( 0, 0 );
93  strptime( sqlTime.c_str(), "%Y-%m-%d %H:%M:%S", &time );
94  m_cache.setCreationTime (UtcTimeStamp (&time));
95  m_cache.setNextTargetMsgSeqNum( atol( query.getValue( 0, 1 ) ) );
96  m_cache.setNextSenderMsgSeqNum( atol( query.getValue( 0, 2 ) ) );
97  }
98  else
99  {
100  UtcTimeStamp time = m_cache.getCreationTime();
101  char sqlTime[ 20 ];
102  int year, month, day, hour, minute, second, millis;
103  time.getYMD (year, month, day);
104  time.getHMS (hour, minute, second, millis);
105  STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
106  year, month, day, hour, minute, second );
107  std::stringstream queryString2;
108  queryString2 << "INSERT INTO sessions (beginstring, sendercompid, targetcompid, session_qualifier,"
109  << "creation_time, incoming_seqnum, outgoing_seqnum) VALUES("
110  << "\"" << m_sessionID.getBeginString().getValue() << "\","
111  << "\"" << m_sessionID.getSenderCompID().getValue() << "\","
112  << "\"" << m_sessionID.getTargetCompID().getValue() << "\","
113  << "\"" << m_sessionID.getSessionQualifier() << "\","
114  << "'" << sqlTime << "',"
115  << m_cache.getNextTargetMsgSeqNum() << ","
116  << m_cache.getNextSenderMsgSeqNum() << ")";
117 
118  MySQLQuery query2( queryString2.str() );
119  if( !m_pConnection->execute(query2) )
120  throw ConfigError( "Unable to create session in database" );
121  }
122 }
123 
124 MessageStore* MySQLStoreFactory::create( const SessionID& s )
125 {
126  if( m_useSettings )
127  return create( s, m_settings.get(s) );
128  else if( m_useDictionary )
129  return create( s, m_dictionary );
130  else
131  {
132  DatabaseConnectionID id( m_database, m_user, m_password, m_host, m_port );
133  return new MySQLStore( s, id, m_connectionPoolPtr.get() );
134  }
135 }
136 
137 MessageStore* MySQLStoreFactory::create( const SessionID& s, const Dictionary& settings )
138 {
139  std::string database = DEFAULT_DATABASE;
140  std::string user = DEFAULT_USER;
141  std::string password = DEFAULT_PASSWORD;
142  std::string host = DEFAULT_HOST;
143  short port = DEFAULT_PORT;
144 
145  try { database = settings.getString( MYSQL_STORE_DATABASE ); }
146  catch( ConfigError& ) {}
147 
148  try { user = settings.getString( MYSQL_STORE_USER ); }
149  catch( ConfigError& ) {}
150 
151  try { password = settings.getString( MYSQL_STORE_PASSWORD ); }
152  catch( ConfigError& ) {}
153 
154  try { host = settings.getString( MYSQL_STORE_HOST ); }
155  catch( ConfigError& ) {}
156 
157  try { port = ( short ) settings.getInt( MYSQL_STORE_PORT ); }
158  catch( ConfigError& ) {}
159 
160  DatabaseConnectionID id( database, user, password, host, port );
161  return new MySQLStore( s, id, m_connectionPoolPtr.get() );
162 }
163 
164 void MySQLStoreFactory::destroy( MessageStore* pStore )
165 {
166  delete pStore;
167 }
168 
169 bool MySQLStore::set( int msgSeqNum, const std::string& msg )
170 throw ( IOException )
171 {
172  char* msgCopy = new char[ (msg.size() * 2) + 1 ];
173  mysql_escape_string( msgCopy, msg.c_str(), msg.size() );
174 
175  std::stringstream queryString;
176  queryString << "INSERT INTO messages "
177  << "(beginstring, sendercompid, targetcompid, session_qualifier, msgseqnum, message) "
178  << "VALUES ("
179  << "\"" << m_sessionID.getBeginString().getValue() << "\","
180  << "\"" << m_sessionID.getSenderCompID().getValue() << "\","
181  << "\"" << m_sessionID.getTargetCompID().getValue() << "\","
182  << "\"" << m_sessionID.getSessionQualifier() << "\","
183  << msgSeqNum << ","
184  << "\"" << msgCopy << "\")";
185 
186  delete [] msgCopy;
187 
188  MySQLQuery query( queryString.str() );
189  if( !m_pConnection->execute(query) )
190  {
191  std::stringstream queryString2;
192  queryString2 << "UPDATE messages SET message=\"" << msg << "\" WHERE "
193  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
194  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
195  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
196  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\" and "
197  << "msgseqnum=" << msgSeqNum;
198  MySQLQuery query2( queryString2.str() );
199  if( !m_pConnection->execute(query2) )
200  query2.throwException();
201  }
202  return true;
203 }
204 
205 void MySQLStore::get( int begin, int end,
206  std::vector < std::string > & result ) const
207 throw ( IOException )
208 {
209  result.clear();
210  std::stringstream queryString;
211  queryString << "SELECT message FROM messages WHERE "
212  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
213  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
214  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
215  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\" and "
216  << "msgseqnum>=" << begin << " and " << "msgseqnum<=" << end << " "
217  << "ORDER BY msgseqnum";
218 
219  MySQLQuery query( queryString.str() );
220  if( !m_pConnection->execute(query) )
221  query.throwException();
222 
223  int rows = query.rows();
224  for( int row = 0; row < rows; row++ )
225  result.push_back( query.getValue( row, 0 ) );
226 }
227 
228 int MySQLStore::getNextSenderMsgSeqNum() const throw ( IOException )
229 {
230  return m_cache.getNextSenderMsgSeqNum();
231 }
232 
233 int MySQLStore::getNextTargetMsgSeqNum() const throw ( IOException )
234 {
235  return m_cache.getNextTargetMsgSeqNum();
236 }
237 
238 void MySQLStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
239 {
240  std::stringstream queryString;
241  queryString << "UPDATE sessions SET outgoing_seqnum=" << value << " WHERE "
242  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
243  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
244  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
245  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
246  MySQLQuery query( queryString.str() );
247  if( !m_pConnection->execute(query) )
248  query.throwException();
249  m_cache.setNextSenderMsgSeqNum( value );
250 }
251 
252 void MySQLStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
253 {
254  std::stringstream queryString;
255  queryString << "UPDATE sessions SET incoming_seqnum=" << value << " WHERE "
256  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
257  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
258  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
259  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
260 
261  MySQLQuery query( queryString.str() );
262  if( !m_pConnection->execute(query) )
263  query.throwException();
264 
265  m_cache.setNextTargetMsgSeqNum( value );
266 }
267 
268 void MySQLStore::incrNextSenderMsgSeqNum() throw ( IOException )
269 {
270  m_cache.incrNextSenderMsgSeqNum();
271  setNextSenderMsgSeqNum( m_cache.getNextSenderMsgSeqNum() );
272 }
273 
274 void MySQLStore::incrNextTargetMsgSeqNum() throw ( IOException )
275 {
276  m_cache.incrNextTargetMsgSeqNum();
277  setNextTargetMsgSeqNum( m_cache.getNextTargetMsgSeqNum() );
278 }
279 
280 UtcTimeStamp MySQLStore::getCreationTime() const throw ( IOException )
281 {
282  return m_cache.getCreationTime();
283 }
284 
285 void MySQLStore::reset() throw ( IOException )
286 {
287  std::stringstream queryString;
288  queryString << "DELETE FROM messages WHERE "
289  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
290  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
291  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
292  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
293 
294  MySQLQuery query( queryString.str() );
295  if( !m_pConnection->execute(query) )
296  query.throwException();
297 
298  m_cache.reset();
299  UtcTimeStamp time = m_cache.getCreationTime();
300 
301  int year, month, day, hour, minute, second, millis;
302  time.getYMD( year, month, day );
303  time.getHMS( hour, minute, second, millis );
304 
305  char sqlTime[ 20 ];
306  STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
307  year, month, day, hour, minute, second );
308 
309  std::stringstream queryString2;
310  queryString2 << "UPDATE sessions SET creation_time='" << sqlTime << "', "
311  << "incoming_seqnum=" << m_cache.getNextTargetMsgSeqNum() << ", "
312  << "outgoing_seqnum=" << m_cache.getNextSenderMsgSeqNum() << " WHERE "
313  << "beginstring=" << "\"" << m_sessionID.getBeginString().getValue() << "\" and "
314  << "sendercompid=" << "\"" << m_sessionID.getSenderCompID().getValue() << "\" and "
315  << "targetcompid=" << "\"" << m_sessionID.getTargetCompID().getValue() << "\" and "
316  << "session_qualifier=" << "\"" << m_sessionID.getSessionQualifier() << "\"";
317 
318  MySQLQuery query2( queryString2.str() );
319  if( !m_pConnection->execute(query2) )
320  query2.throwException();
321 }
322 
323 void MySQLStore::refresh() throw ( IOException )
324 {
325  m_cache.reset();
326  populateCache();
327 }
328 
329 }
330 
331 #endif
SessionID.h
MySQLStore.h
SessionSettings.h
FIX::MYSQL_STORE_USER
const char MYSQL_STORE_USER[]
Definition: SessionSettings.h:100
FIX::MYSQL_STORE_PASSWORD
const char MYSQL_STORE_PASSWORD[]
Definition: SessionSettings.h:101
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
FieldConvertors.h
Parser.h
STRING_SPRINTF
#define STRING_SPRINTF
Definition: Utility.h:222
FIX
Definition: Acceptor.cpp:34
FIX::MYSQL_STORE_PORT
const char MYSQL_STORE_PORT[]
Definition: SessionSettings.h:103
FIX::MYSQL_STORE_DATABASE
const char MYSQL_STORE_DATABASE[]
Definition: SessionSettings.h:99
FIX::MYSQL_STORE_HOST
const char MYSQL_STORE_HOST[]
Definition: SessionSettings.h:102
Utility.h

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001