ThreadedSSLSocketInitiator.cpp
Go to the documentation of this file.
1 /* ====================================================================
2  * Copyright (c) 1998-2006 Ralf S. Engelschall. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  *
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following
13  * disclaimer in the documentation and/or other materials
14  * provided with the distribution.
15  *
16  * 3. All advertising materials mentioning features or use of this
17  * software must display the following acknowledgment:
18  * "This product includes software developed by
19  * Ralf S. Engelschall <rse@engelschall.com> for use in the
20  * mod_ssl project (http://www.modssl.org/)."
21  *
22  * 4. The names "mod_ssl" must not be used to endorse or promote
23  * products derived from this software without prior written
24  * permission. For written permission, please contact
25  * rse@engelschall.com.
26  *
27  * 5. Products derived from this software may not be called "mod_ssl"
28  * nor may "mod_ssl" appear in their names without prior
29  * written permission of Ralf S. Engelschall.
30  *
31  * 6. Redistributions of any form whatsoever must retain the following
32  * acknowledgment:
33  * "This product includes software developed by
34  * Ralf S. Engelschall <rse@engelschall.com> for use in the
35  * mod_ssl project (http://www.modssl.org/)."
36  *
37  * THIS SOFTWARE IS PROVIDED BY RALF S. ENGELSCHALL ``AS IS'' AND ANY
38  * EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
39  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
40  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RALF S. ENGELSCHALL OR
41  * HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
42  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
43  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
44  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
45  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
46  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
47  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
48  * OF THE POSSIBILITY OF SUCH DAMAGE.
49  * ====================================================================
50  */
51 
52 /* ====================================================================
53  * Copyright (c) 1995-1999 Ben Laurie. All rights reserved.
54  *
55  * Redistribution and use in source and binary forms, with or without
56  * modification, are permitted provided that the following conditions
57  * are met:
58  *
59  * 1. Redistributions of source code must retain the above copyright
60  * notice, this list of conditions and the following disclaimer.
61  *
62  * 2. Redistributions in binary form must reproduce the above copyright
63  * notice, this list of conditions and the following disclaimer in
64  * the documentation and/or other materials provided with the
65  * distribution.
66  *
67  * 3. All advertising materials mentioning features or use of this
68  * software must display the following acknowledgment:
69  * "This product includes software developed by Ben Laurie
70  * for use in the Apache-SSL HTTP server project."
71  *
72  * 4. The name "Apache-SSL Server" must not be used to
73  * endorse or promote products derived from this software without
74  * prior written permission.
75  *
76  * 5. Redistributions of any form whatsoever must retain the following
77  * acknowledgment:
78  * "This product includes software developed by Ben Laurie
79  * for use in the Apache-SSL HTTP server project."
80  *
81  * THIS SOFTWARE IS PROVIDED BY BEN LAURIE ``AS IS'' AND ANY
82  * EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
83  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
84  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL BEN LAURIE OR
85  * HIS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
86  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
87  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
88  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
89  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
90  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
91  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
92  * OF THE POSSIBILITY OF SUCH DAMAGE.
93  * ====================================================================
94  */
95 /****************************************************************************
96 ** Copyright (c) 2001-2014
97 **
98 ** This file is part of the QuickFIX FIX Engine
99 **
100 ** This file may be distributed under the terms of the quickfixengine.org
101 ** license as defined by quickfixengine.org and appearing in the file
102 ** LICENSE included in the packaging of this file.
103 **
104 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
105 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
106 **
107 ** See http://www.quickfixengine.org/LICENSE for licensing information.
108 **
109 ** Contact ask@quickfixengine.org if any conditions of this licensing are
110 ** not clear to you.
111 **
112 ****************************************************************************/
113 
114 #ifdef _MSC_VER
115 #include "stdafx.h"
116 #else
117 #include "config.h"
118 #endif
119 
120 #if (HAVE_SSL > 0)
121 
122 #include "UtilitySSL.h"
124 #include "Session.h"
125 #include "Settings.h"
126 
127 namespace FIX
128 {
129 FIX::ThreadedSSLSocketInitiator *initObjT = 0;
130 
131 int ThreadedSSLSocketInitiator::passwordHandleCB(char *buf, int bufsize, int verify, void *job)
132 {
133  return initObjT->passwordHandleCallback(buf, bufsize, verify, job);
134 }
135 
136 ThreadedSSLSocketInitiator::ThreadedSSLSocketInitiator(
137  Application &application, MessageStoreFactory &factory,
138  const SessionSettings &settings) throw(ConfigError)
139  : Initiator(application, factory, settings), m_lastConnect(0),
140  m_reconnectInterval(30), m_noDelay(false), m_sendBufSize(0),
141  m_rcvBufSize(0), m_sslInit(false), m_ctx(0), m_cert(0), m_key(0)
142 {
143  socket_init();
144  initObjT = this;
145 }
146 
147 ThreadedSSLSocketInitiator::ThreadedSSLSocketInitiator(
148  Application &application, MessageStoreFactory &factory,
149  const SessionSettings &settings, LogFactory &logFactory) throw(ConfigError)
150  : Initiator(application, factory, settings, logFactory), m_lastConnect(0),
151  m_reconnectInterval(30), m_noDelay(false), m_sendBufSize(0),
152  m_rcvBufSize(0), m_sslInit(false), m_ctx(0), m_cert(0), m_key(0)
153 {
154  socket_init();
155  initObjT = this;
156 }
157 
158 ThreadedSSLSocketInitiator::~ThreadedSSLSocketInitiator()
159 {
160  if (m_sslInit)
161  {
162  SSL_CTX_free(m_ctx);
163  m_ctx = 0;
164  ssl_term();
165  }
166 
167  socket_term();
168 }
169 
170 void ThreadedSSLSocketInitiator::onConfigure(const SessionSettings &s) throw(
171  ConfigError)
172 {
173  const Dictionary &dict = s.get();
174 
175  if (dict.has(RECONNECT_INTERVAL))
176  m_reconnectInterval = dict.getInt(RECONNECT_INTERVAL);
177  if (dict.has(SOCKET_NODELAY))
178  m_noDelay = dict.getBool(SOCKET_NODELAY);
179  if (dict.has(SOCKET_SEND_BUFFER_SIZE))
180  m_sendBufSize = dict.getInt(SOCKET_SEND_BUFFER_SIZE);
181  if (dict.has(SOCKET_RECEIVE_BUFFER_SIZE))
182  m_rcvBufSize = dict.getInt(SOCKET_RECEIVE_BUFFER_SIZE);
183 }
184 
185 void ThreadedSSLSocketInitiator::onInitialize(const SessionSettings &s) throw(
186  RuntimeError)
187 {
188  if (m_sslInit)
189  return;
190 
191  ssl_init();
192 
193  std::string errStr;
194 
195  /* set up the application context */
196  if ((m_ctx = createSSLContext(false, m_settings, errStr)) == 0)
197  {
198  throw RuntimeError(errStr);
199  }
200 
201  if (m_cert && m_key)
202  {
203  if (SSL_CTX_use_certificate(m_ctx, m_cert) < 1)
204  {
205  ssl_term();
206  throw RuntimeError("Failed to set certificate");
207  }
208 
209  if (SSL_CTX_use_RSAPrivateKey(m_ctx, m_key) <= 0)
210  {
211  ssl_term();
212  throw RuntimeError("Failed to set key");
213  }
214  }
215  else if (!loadSSLCert(m_ctx, false, m_settings, getLog(), ThreadedSSLSocketInitiator::passwordHandleCB, errStr))
216  {
217  ssl_term();
218  throw RuntimeError(errStr);
219  }
220 
221  int verifyLevel;
222  if (!loadCAInfo(m_ctx, false, m_settings, getLog(), errStr, verifyLevel))
223  {
224  ssl_term();
225  throw RuntimeError(errStr);
226  }
227 
228  m_sslInit = true;
229 }
230 
231 void ThreadedSSLSocketInitiator::onStart()
232 {
233  while (!isStopped())
234  {
235  time_t now;
236  ::time(&now);
237 
238  if ((now - m_lastConnect) >= m_reconnectInterval)
239  {
240  Locker l(m_mutex);
241  connect();
242  m_lastConnect = now;
243  }
244 
245  process_sleep(1);
246  }
247 }
248 
249 bool ThreadedSSLSocketInitiator::onPoll(double timeout) { return false; }
250 
251 void ThreadedSSLSocketInitiator::onStop()
252 {
253  SocketToThread threads;
254  SocketToThread::iterator i;
255 
256  {
257  Locker l(m_mutex);
258 
259  time_t start = 0;
260  time_t now = 0;
261 
262  ::time(&start);
263  while (isLoggedOn())
264  {
265  if (::time(&now) - 5 >= start)
266  break;
267  }
268 
269  threads = m_threads;
270  m_threads.clear();
271  }
272 
273  for (i = threads.begin(); i != threads.end(); ++i)
274  ssl_socket_close(i->first.first, i->first.second);
275 
276  for (i = threads.begin(); i != threads.end(); ++i)
277  {
278  thread_join(i->second);
279  if (i->first.second != 0)
280  SSL_free(i->first.second);
281  }
282  threads.clear();
283 }
284 
285 void ThreadedSSLSocketInitiator::doConnect(const SessionID &s,
286  const Dictionary &d)
287 {
288  try
289  {
290  Session *session = Session::lookupSession(s);
291  if (!session->isSessionTime(UtcTimeStamp()))
292  return;
293 
294  Log *log = session->getLog();
295 
296  std::string address;
297  short port = 0;
298  getHost(s, d, address, port);
299 
300  int socket = socket_createConnector();
301  if (m_noDelay)
302  socket_setsockopt(socket, TCP_NODELAY);
303  if (m_sendBufSize)
304  socket_setsockopt(socket, SO_SNDBUF, m_sendBufSize);
305  if (m_rcvBufSize)
306  socket_setsockopt(socket, SO_RCVBUF, m_rcvBufSize);
307 
308  setPending(s);
309  log->onEvent("Connecting to " + address + " on port " +
310  IntConvertor::convert((unsigned short)port));
311 
312  SSL *ssl = SSL_new(m_ctx);
313  if (ssl == 0)
314  {
315  log->onEvent("Failed to create ssl object");
316  return;
317  }
318  SSL_clear(ssl);
319  BIO *sbio = BIO_new_socket(socket, BIO_CLOSE);
320  SSL_set_bio(ssl, sbio, sbio);
321 
322  ThreadedSSLSocketConnection *pConnection = new ThreadedSSLSocketConnection(
323  s, socket, ssl, address, port, getLog());
324 
325  ThreadPair *pair = new ThreadPair(this, pConnection);
326 
327  {
328  Locker l(m_mutex);
329  thread_id thread;
330  if (thread_spawn(&socketThread, pair, thread))
331  {
332  addThread(SocketKey(socket, ssl), thread);
333  }
334  else
335  {
336  delete pair;
337  pConnection->disconnect();
338  delete pConnection;
339  SSL_free(ssl);
340  setDisconnected(s);
341  }
342  }
343  }
344  catch (std::exception &)
345  {
346  }
347 }
348 
349 void ThreadedSSLSocketInitiator::addThread(SocketKey s, thread_id t)
350 {
351  Locker l(m_mutex);
352 
353  m_threads[s] = t;
354 }
355 
356 void ThreadedSSLSocketInitiator::removeThread(SocketKey s)
357 {
358  Locker l(m_mutex);
359  SocketToThread::iterator i = m_threads.find(s);
360 
361  if (i != m_threads.end())
362  {
363  thread_detach(i->second);
364  if (i->first.second != 0)
365  SSL_free(i->first.second);
366  m_threads.erase(i);
367  }
368 }
369 
370 THREAD_PROC ThreadedSSLSocketInitiator::socketThread(void *p)
371 {
372  ThreadPair *pair = reinterpret_cast< ThreadPair * >(p);
373 
374  ThreadedSSLSocketInitiator *pInitiator = pair->first;
375  ThreadedSSLSocketConnection *pConnection = pair->second;
376  FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
377  FIX::Session *pSession = FIX::Session::lookupSession(sessionID);
378  int socket = pConnection->getSocket();
379  delete pair;
380 
381  pInitiator->lock();
382 
383  if (!pConnection->connect())
384  {
385  pInitiator->getLog()->onEvent("Connection failed");
386  pConnection->disconnect();
387  SSL *ssl = pConnection->sslObject();
388  delete pConnection;
389  pInitiator->removeThread(SocketKey(socket, ssl));
390  pInitiator->setDisconnected(sessionID);
391  return 0;
392  }
393 
394  // Do the SSL handshake.
395  int rc = SSL_connect(pConnection->sslObject());
396  if (rc <= 0)
397  {
398  int err = SSL_get_error(pConnection->sslObject(), rc);
399  pInitiator->getLog()->onEvent("SSL_connect failed with SSL error " + IntConvertor::convert(err));
400  pConnection->disconnect();
401  SSL *ssl = pConnection->sslObject();
402  delete pConnection;
403  pInitiator->removeThread(SocketKey(socket, ssl));
404  pInitiator->setDisconnected(sessionID);
405  return 0;
406  }
407 
408  pInitiator->setConnected(sessionID);
409  pInitiator->getLog()->onEvent("Connection succeeded");
410 
411  pSession->next();
412 
413  while (pConnection->read())
414  {
415  }
416 
417  SSL *ssl = pConnection->sslObject();
418  delete pConnection;
419  if (!pInitiator->isStopped())
420  pInitiator->removeThread(SocketKey(socket, ssl));
421 
422  pInitiator->setDisconnected(sessionID);
423  return 0;
424 }
425 
426 void ThreadedSSLSocketInitiator::getHost(const SessionID &s,
427  const Dictionary &d,
428  std::string &address, short &port)
429 {
430  int num = 0;
431  SessionToHostNum::iterator i = m_sessionToHostNum.find(s);
432  if (i != m_sessionToHostNum.end())
433  num = i->second;
434 
435  std::stringstream hostStream;
436  hostStream << SOCKET_CONNECT_HOST << num;
437  std::string hostString = hostStream.str();
438 
439  std::stringstream portStream;
440  portStream << SOCKET_CONNECT_PORT << num;
441  std::string portString = portStream.str();
442 
443  if (d.has(hostString) && d.has(portString))
444  {
445  address = d.getString(hostString);
446  port = (short)d.getInt(portString);
447  }
448  else
449  {
450  num = 0;
451  address = d.getString(SOCKET_CONNECT_HOST);
452  port = (short)d.getInt(SOCKET_CONNECT_PORT);
453  }
454 
455  m_sessionToHostNum[s] = ++num;
456 }
457 
458 int ThreadedSSLSocketInitiator::passwordHandleCallback(char *buf, size_t bufsize,
459  int verify, void *job)
460 {
461  if (m_password.length() > bufsize)
462  return -1;
463 
464  std::strcpy(buf, m_password.c_str());
465  return m_password.length();
466 }
467 }
468 
469 #endif
FIX::thread_id
pthread_t thread_id
Definition: Utility.h:190
FIX::SOCKET_RECEIVE_BUFFER_SIZE
const char SOCKET_RECEIVE_BUFFER_SIZE[]
Definition: SessionSettings.h:87
FIX::Session::lookupSession
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1513
FIX::SOCKET_NODELAY
const char SOCKET_NODELAY[]
Definition: SessionSettings.h:85
FIX::socket_setsockopt
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:225
UtilitySSL.h
FIX::socket_init
void socket_init()
Definition: Utility.cpp:98
FIX::SOCKET_SEND_BUFFER_SIZE
const char SOCKET_SEND_BUFFER_SIZE[]
Definition: SessionSettings.h:86
FIX::thread_detach
void thread_detach(thread_id thread)
Definition: Utility.cpp:464
FIX::socket_term
void socket_term()
Definition: Utility.cpp:113
FIX::SessionID
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:47
FIX::Session::next
void next()
Definition: Session.cpp:142
FIX::TYPE::UtcTimeStamp
@ UtcTimeStamp
Definition: FieldTypes.h:940
THREAD_PROC
#define THREAD_PROC
Definition: Utility.h:184
FIX::IntConvertor::convert
static std::string convert(signed_int value)
Definition: FieldConvertors.h:170
FIX::RECONNECT_INTERVAL
const char RECONNECT_INTERVAL[]
Definition: SessionSettings.h:88
FIX::socket_createConnector
int socket_createConnector()
Definition: Utility.cpp:160
ThreadedSSLSocketInitiator.h
Settings.h
FIX::process_sleep
void process_sleep(double s)
Definition: Utility.cpp:483
FIX::SOCKET_CONNECT_PORT
const char SOCKET_CONNECT_PORT[]
Definition: SessionSettings.h:82
FIX
Definition: Acceptor.cpp:34
Session.h
FIX::thread_join
void thread_join(thread_id thread)
Definition: Utility.cpp:454
FIX::thread_spawn
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:433
FIX::Session
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:62
FIX::SOCKET_CONNECT_HOST
const char SOCKET_CONNECT_HOST[]
Definition: SessionSettings.h:81

Generated on Wed Apr 29 2020 19:41:30 for QuickFIX by doxygen 1.8.17 written by Dimitri van Heesch, © 1997-2001