Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketConnection:
Collaboration graph
[legend]

Public Types

typedef std::set< SessionIDSessions
 

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Log *pLog)
 
 ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Log *pLog, const std::string &sourceAddress="", short sourcePort=0)
 
virtual ~ThreadedSocketConnection ()
 
SessiongetSession () const
 
int getSocket () const
 
bool connect ()
 
void disconnect ()
 
bool read ()
 

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
 
void processStream ()
 
bool send (const std::string &)
 
bool setSession (const std::string &msg)
 
- Private Member Functions inherited from FIX::Responder
virtual ~Responder ()
 

Private Attributes

int m_socket
 
char m_buffer [BUFSIZ]
 
std::string m_address
 
int m_port
 
std::string m_sourceAddress
 
int m_sourcePort
 
Logm_pLog
 
Parser m_parser
 
Sessions m_sessions
 
Sessionm_pSession
 
bool m_disconnect
 
fd_set m_fds
 

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 44 of file ThreadedSocketConnection.h.

Member Typedef Documentation

◆ Sessions

Definition at line 47 of file ThreadedSocketConnection.h.

Constructor & Destructor Documentation

◆ ThreadedSocketConnection() [1/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( int  s,
Sessions  sessions,
Log pLog 
)

◆ ThreadedSocketConnection() [2/2]

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( const SessionID sessionID,
int  s,
const std::string &  address,
short  port,
Log pLog,
const std::string &  sourceAddress = "",
short  sourcePort = 0 
)

Definition at line 44 of file ThreadedSocketConnection.cpp.

49 : m_socket( s ), m_address( address ), m_port( port ),
50 m_sourceAddress( sourceAddress ), m_sourcePort( sourcePort ),
51 m_pLog( pLog ),
52 m_pSession( Session::lookupSession( sessionID ) ),
53 m_disconnect( false )
54{
55 FD_ZERO( &m_fds );
56 FD_SET( m_socket, &m_fds );
57 if ( m_pSession ) m_pSession->setResponder( this );
58}
void setResponder(Responder *pR)
Definition Session.h:210
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496

References m_fds, m_pSession, m_socket, and FIX::Session::setResponder().

◆ ~ThreadedSocketConnection()

FIX::ThreadedSocketConnection::~ThreadedSocketConnection ( )
virtual

Definition at line 60 of file ThreadedSocketConnection.cpp.

61{
62 if ( m_pSession )
63 {
66 }
67}
static void unregisterSession(const SessionID &)
Definition Session.cpp:1547
const SessionID & getSessionID() const
Definition Session.h:75

References FIX::Session::getSessionID(), m_pSession, FIX::Session::setResponder(), and FIX::Session::unregisterSession().

Member Function Documentation

◆ connect()

bool FIX::ThreadedSocketConnection::connect ( )

Definition at line 82 of file ThreadedSocketConnection.cpp.

83{
84 // do the bind in the thread as name resolution may block
85 if ( !m_sourceAddress.empty() || m_sourcePort )
87
88 return socket_connect(getSocket(), m_address.c_str(), m_port) >= 0;
89}
int socket_connect(int socket, const char *address, int port)
Definition Utility.cpp:148
int socket_bind(int socket, const char *hostname, int port)
Definition Utility.cpp:103

References getSocket(), m_address, m_port, m_socket, m_sourceAddress, m_sourcePort, FIX::socket_bind(), and FIX::socket_connect().

Referenced by FIX::ThreadedSocketInitiator::socketThread().

◆ disconnect()

void FIX::ThreadedSocketConnection::disconnect ( )
virtual

◆ getSession()

Session * FIX::ThreadedSocketConnection::getSession ( ) const
inline

Definition at line 56 of file ThreadedSocketConnection.h.

56{ return m_pSession; }

References m_pSession.

Referenced by FIX::ThreadedSocketInitiator::socketThread().

◆ getSocket()

int FIX::ThreadedSocketConnection::getSocket ( ) const
inline

◆ processStream()

void FIX::ThreadedSocketConnection::processStream ( )
private

Definition at line 156 of file ThreadedSocketConnection.cpp.

157{
158 std::string msg;
159 while( readMessage(msg) )
160 {
161 if ( !m_pSession )
162 {
163 if ( !setSession( msg ) )
164 { disconnect(); continue; }
165 }
166 try
167 {
168 m_pSession->next( msg, UtcTimeStamp() );
169 }
170 catch( InvalidMessage& )
171 {
172 if( !m_pSession->isLoggedOn() )
173 {
174 disconnect();
175 return;
176 }
177 }
178 }
179}
void next()
Definition Session.cpp:125
bool isLoggedOn()
Definition Session.h:65
bool setSession(const std::string &msg)

References disconnect(), FIX::Session::isLoggedOn(), m_pSession, FIX::Session::next(), readMessage(), and setSession().

Referenced by read().

◆ read()

bool FIX::ThreadedSocketConnection::read ( )

Definition at line 97 of file ThreadedSocketConnection.cpp.

98{
99 struct timeval timeout = { 1, 0 };
100 fd_set readset = m_fds;
101
102 try
103 {
104 // Wait for input (1 second timeout)
105 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
106
107 if( result > 0 ) // Something to read
108 {
109 // We can read without blocking
110 ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
111 if ( size <= 0 ) { throw SocketRecvFailed( size ); }
113 }
114 else if( result == 0 && m_pSession ) // Timeout
115 {
116 m_pSession->next();
117 }
118 else if( result < 0 ) // Error
119 {
120 throw SocketRecvFailed( result );
121 }
122
124 return true;
125 }
126 catch ( SocketRecvFailed& e )
127 {
128 if( m_disconnect )
129 return false;
130
131 if( m_pSession )
132 {
133 m_pSession->getLog()->onEvent( e.what() );
135 }
136 else
137 {
138 disconnect();
139 }
140
141 return false;
142 }
143}
virtual void onEvent(const std::string &)=0
void addToStream(const char *str, size_t len)
Definition Parser.h:48
Log * getLog()
Definition Session.h:227
void disconnect()
Definition Session.cpp:613
ssize_t socket_recv(int s, char *buf, size_t length)
Definition Utility.cpp:170

References FIX::Parser::addToStream(), FIX::Session::disconnect(), disconnect(), FIX::Session::getLog(), m_buffer, m_disconnect, m_fds, m_parser, m_pSession, m_socket, FIX::Session::next(), FIX::Log::onEvent(), processStream(), and FIX::socket_recv().

Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

◆ readMessage()

bool FIX::ThreadedSocketConnection::readMessage ( std::string &  msg)
throw (SocketRecvFailed
)
private

Definition at line 145 of file ThreadedSocketConnection.cpp.

147{
148 try
149 {
150 return m_parser.readFixMessage( msg );
151 }
152 catch ( MessageParseError& ) {}
153 return true;
154}
bool readFixMessage(std::string &str)
Definition Parser.cpp:59

Referenced by processStream().

◆ send()

bool FIX::ThreadedSocketConnection::send ( const std::string &  msg)
privatevirtual

Implements FIX::Responder.

Definition at line 69 of file ThreadedSocketConnection.cpp.

70{
71 int totalSent = 0;
72 while(totalSent < (int)msg.length())
73 {
74 ssize_t sent = socket_send( m_socket, msg.c_str() + totalSent, msg.length() );
75 if(sent < 0) return false;
76 totalSent += sent;
77 }
78
79 return true;
80}
ssize_t socket_send(int s, const char *msg, size_t length)
Definition Utility.cpp:175

References m_socket, and FIX::socket_send().

◆ setSession()

bool FIX::ThreadedSocketConnection::setSession ( const std::string &  msg)
private

Definition at line 181 of file ThreadedSocketConnection.cpp.

182{
183 m_pSession = Session::lookupSession( msg, true );
184 if ( !m_pSession )
185 {
186 if( m_pLog )
187 {
188 m_pLog->onEvent( "Session not found for incoming message: " + msg );
189 m_pLog->onIncoming( msg );
190 }
191 return false;
192 }
193
194 SessionID sessionID = m_pSession->getSessionID();
195 m_pSession = 0;
196
197 // see if the session frees up within 5 seconds
198 for( int i = 1; i <= 5; i++ )
199 {
200 if( !Session::isSessionRegistered( sessionID ) )
202 if( m_pSession ) break;
203 process_sleep( 1 );
204 }
205
206 if ( !m_pSession )
207 return false;
208 if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
209 return false;
210
211 m_pSession->setResponder( this );
212 return true;
213}
virtual void onIncoming(const std::string &)=0
static Session * registerSession(const SessionID &)
Definition Session.cpp:1537
static bool isSessionRegistered(const SessionID &)
Definition Session.cpp:1531
void process_sleep(double s)
Definition Utility.cpp:466

References FIX::Session::getSessionID(), FIX::Session::isSessionRegistered(), FIX::Session::lookupSession(), m_pLog, m_pSession, m_sessions, FIX::Log::onEvent(), FIX::Log::onIncoming(), FIX::process_sleep(), FIX::Session::registerSession(), and FIX::Session::setResponder().

Referenced by processStream().

Member Data Documentation

◆ m_address

std::string FIX::ThreadedSocketConnection::m_address
private

Definition at line 71 of file ThreadedSocketConnection.h.

Referenced by connect().

◆ m_buffer

char FIX::ThreadedSocketConnection::m_buffer[BUFSIZ]
private

Definition at line 69 of file ThreadedSocketConnection.h.

Referenced by read().

◆ m_disconnect

bool FIX::ThreadedSocketConnection::m_disconnect
private

Definition at line 80 of file ThreadedSocketConnection.h.

Referenced by disconnect(), and read().

◆ m_fds

fd_set FIX::ThreadedSocketConnection::m_fds
private

◆ m_parser

Parser FIX::ThreadedSocketConnection::m_parser
private

Definition at line 77 of file ThreadedSocketConnection.h.

Referenced by read().

◆ m_pLog

Log* FIX::ThreadedSocketConnection::m_pLog
private

Definition at line 76 of file ThreadedSocketConnection.h.

Referenced by setSession().

◆ m_port

int FIX::ThreadedSocketConnection::m_port
private

Definition at line 72 of file ThreadedSocketConnection.h.

Referenced by connect().

◆ m_pSession

Session* FIX::ThreadedSocketConnection::m_pSession
private

◆ m_sessions

Sessions FIX::ThreadedSocketConnection::m_sessions
private

Definition at line 78 of file ThreadedSocketConnection.h.

Referenced by setSession().

◆ m_socket

int FIX::ThreadedSocketConnection::m_socket
private

◆ m_sourceAddress

std::string FIX::ThreadedSocketConnection::m_sourceAddress
private

Definition at line 73 of file ThreadedSocketConnection.h.

Referenced by connect().

◆ m_sourcePort

int FIX::ThreadedSocketConnection::m_sourcePort
private

Definition at line 74 of file ThreadedSocketConnection.h.

Referenced by connect().


The documentation for this class was generated from the following files:

Generated on Thu May 22 2025 08:23:50 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001