SocketInitiator.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
26#include "SocketInitiator.h"
27#include "Session.h"
28#include "Settings.h"
29
30namespace FIX
31{
33 MessageStoreFactory& factory,
34 const SessionSettings& settings )
35throw( ConfigError )
36: Initiator( application, factory, settings ),
37 m_connector( 1 ), m_lastConnect( 0 ),
38 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
39 m_rcvBufSize( 0 )
40{
41}
42
44 MessageStoreFactory& factory,
45 const SessionSettings& settings,
46 LogFactory& logFactory )
47throw( ConfigError )
48: Initiator( application, factory, settings, logFactory ),
49 m_connector( 1 ), m_lastConnect( 0 ),
50 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
51 m_rcvBufSize( 0 )
52{
53}
54
56{
57 SocketConnections::iterator i;
58 for (i = m_connections.begin();
59 i != m_connections.end(); ++i)
60 delete i->second;
61
62 for (i = m_pendingConnections.begin();
63 i != m_pendingConnections.end(); ++i)
64 delete i->second;
65}
66
68throw ( ConfigError )
69{
70 const Dictionary& dict = s.get();
71
72 if( dict.has( RECONNECT_INTERVAL ) )
73 m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
74 if( dict.has( SOCKET_NODELAY ) )
75 m_noDelay = dict.getBool( SOCKET_NODELAY );
76 if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
77 m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
79 m_rcvBufSize = dict.getInt( SOCKET_RECEIVE_BUFFER_SIZE );
80}
81
83throw ( RuntimeError )
84{
85}
86
88{
89 connect();
90
91 while ( !isStopped() ) {
92 m_connector.block( *this, false, 1.0 );
94 }
95
96 time_t start = 0;
97 time_t now = 0;
98
99 ::time( &start );
100 while ( isLoggedOn() )
101 {
102 m_connector.block( *this );
103 if( ::time(&now) -5 >= start )
104 break;
105 }
106}
107
108bool SocketInitiator::onPoll( double timeout )
109{
110 time_t start = 0;
111 time_t now = 0;
112
113 if( isStopped() )
114 {
115 if( start == 0 )
116 ::time( &start );
117 if( !isLoggedOn() )
118 return false;
119 if( ::time(&now) - 5 >= start )
120 return false;
121 }
122
123 m_connector.block( *this, true, timeout );
124 return true;
125}
126
128{
129}
130
132{
133 try
134 {
135 std::string address;
136 short port = 0;
137 std::string sourceAddress;
138 short sourcePort = 0;
139
140 Session* session = Session::lookupSession( s );
141 if( !session->isSessionTime(UtcTimeStamp()) ) return;
142
143 Log* log = session->getLog();
144
145 getHost( s, d, address, port, sourceAddress, sourcePort );
146
147 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
148 int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize, sourceAddress, sourcePort );
149 setPending( s );
150
151 m_pendingConnections[ result ]
152 = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
153 }
154 catch ( std::exception& ) {}
155}
156
158{
159 SocketConnections::iterator i = m_pendingConnections.find( s );
160 if( i == m_pendingConnections.end() ) return;
161 SocketConnection* pSocketConnection = i->second;
162
163 m_connections[s] = pSocketConnection;
164 m_pendingConnections.erase( i );
165 setConnected( pSocketConnection->getSession()->getSessionID() );
166 pSocketConnection->onTimeout();
167}
168
170{
171 SocketConnections::iterator i = m_connections.find( s );
172 if ( i == m_connections.end() ) return ;
173 SocketConnection* pSocketConnection = i->second;
174 if( pSocketConnection->processQueue() )
175 pSocketConnection->unsignal();
176}
177
179{
180 SocketConnections::iterator i = m_connections.find( s );
181 if ( i == m_connections.end() ) return false;
182 SocketConnection* pSocketConnection = i->second;
183 return pSocketConnection->read( connector );
184}
185
187{
188 SocketConnections::iterator i = m_connections.find( s );
189 SocketConnections::iterator j = m_pendingConnections.find( s );
190
191 SocketConnection* pSocketConnection = 0;
192 if( i != m_connections.end() )
193 pSocketConnection = i->second;
194 if( j != m_pendingConnections.end() )
195 pSocketConnection = j->second;
196 if( !pSocketConnection )
197 return;
198
199 setDisconnected( pSocketConnection->getSession()->getSessionID() );
200
201 Session* pSession = pSocketConnection->getSession();
202 if ( pSession )
203 {
204 pSession->disconnect();
205 setDisconnected( pSession->getSessionID() );
206 }
207
208 delete pSocketConnection;
209 m_connections.erase( s );
210 m_pendingConnections.erase( s );
211}
212
214{
215 onTimeout( connector );
216}
217
219{
220 time_t now;
221 ::time( &now );
222
223 if ( (now - m_lastConnect) >= m_reconnectInterval )
224 {
225 connect();
226 m_lastConnect = now;
227 }
228
229 SocketConnections::iterator i;
230 for ( i = m_connections.begin(); i != m_connections.end(); ++i )
231 i->second->onTimeout();
232}
233
235 std::string& address, short& port,
236 std::string& sourceAddress, short& sourcePort)
237{
238 int num = 0;
239 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
240 if ( i != m_sessionToHostNum.end() ) num = i->second;
241
242 std::stringstream hostStream;
243 hostStream << SOCKET_CONNECT_HOST << num;
244 std::string hostString = hostStream.str();
245
246 std::stringstream portStream;
247 portStream << SOCKET_CONNECT_PORT << num;
248 std::string portString = portStream.str();
249
250 sourcePort = 0;
251 sourceAddress.empty();
252
253 if( d.has(hostString) && d.has(portString) )
254 {
255 address = d.getString( hostString );
256 port = ( short ) d.getInt( portString );
257
258 std::stringstream sourceHostStream;
259 sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
260 hostString = sourceHostStream.str();
261 if( d.has(hostString) )
262 sourceAddress = d.getString( hostString );
263
264 std::stringstream sourcePortStream;
265 sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
266 portString = sourcePortStream.str();
267 if( d.has(portString) )
268 sourcePort = ( short ) d.getInt( portString );
269 }
270 else
271 {
272 num = 0;
273 address = d.getString( SOCKET_CONNECT_HOST );
274 port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
275
277 sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
279 sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
280 }
281
282 m_sessionToHostNum[ s ] = ++num;
283}
284}
This interface must be implemented to define what your FIX application does.
Definition Application.h:44
For storage and retrieval of key/value pairs.
Definition Dictionary.h:37
int getInt(const std::string &) const
Get a value as a int.
bool getBool(const std::string &) const
Get a value as a bool.
bool has(const std::string &) const
Check if the dictionary contains a value for key.
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Base for classes which act as an initiator for establishing connections.
Definition Initiator.h:52
bool isLoggedOn()
Check to see if any sessions are currently logged on.
void setConnected(const SessionID &)
void setPending(const SessionID &)
void setDisconnected(const SessionID &)
void start()
Start initiator.
bool isStopped()
Definition Initiator.h:83
This interface must be implemented to create a Log.
Definition Log.h:43
This interface must be implemented to log messages and events.
Definition Log.h:82
virtual void onEvent(const std::string &)=0
This interface must be implemented to create a MessageStore.
Maintains the state and implements the logic of a FIX session.
Definition Session.h:46
bool isSessionTime(const UtcTimeStamp &time)
Definition Session.h:108
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
const SessionID & getSessionID() const
Definition Session.h:75
Log * getLog()
Definition Session.h:227
void disconnect()
Definition Session.cpp:613
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
Container for setting dictionaries mapped to sessions.
Encapsulates a socket file descriptor (single-threaded).
Session * getSession() const
bool read(SocketConnector &s)
Connects sockets to remote ports and addresses.
int connect(const std::string &address, int port, bool noDelay, int sendBufSize, int rcvBufSize, const std::string &sourceAddress="", int sourcePort=0)
void block(Strategy &strategy, bool poll=0, double timeout=0.0)
SocketMonitor & getMonitor()
bool onData(SocketConnector &, int)
void onWrite(SocketConnector &, int)
void onConnect(SocketConnector &, int)
SocketConnections m_pendingConnections
bool onPoll(double timeout)
Implemented to connect and poll for events.
void onError(SocketConnector &)
void onDisconnect(SocketConnector &, int)
SocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
void doConnect(const SessionID &, const Dictionary &d)
Implemented to connect a session to its target.
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
SocketConnections m_connections
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
void onStart()
Implemented to start connecting to targets.
SocketConnector m_connector
SessionToHostNum m_sessionToHostNum
void onTimeout(SocketConnector &)
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
void onStop()
Implemented to stop a running initiator.
Date and Time represented in UTC.
Definition FieldTypes.h:583
const char SOCKET_CONNECT_PORT[]
const char SOCKET_SEND_BUFFER_SIZE[]
const char SOCKET_NODELAY[]
const char SOCKET_RECEIVE_BUFFER_SIZE[]
const char RECONNECT_INTERVAL[]
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_SOURCE_PORT[]
const char SOCKET_CONNECT_SOURCE_HOST[]
Application is not configured correctly
Definition Exceptions.h:88
static std::string convert(signed_int value)
Application encountered serious error during runtime
Definition Exceptions.h:95

Generated on Thu May 22 2025 08:23:50 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001