.h
/************************************************************** * Filename: TcpClient.h * Copyright: Shanghai X Co., Ltd. * * Description: TcpClient头文件. * * @author: w * @version 10/28/2016 @Reviser Initial Version **************************************************************/#ifndef _TCPCLIENT_#define _TCPCLIENT_#include#include #include #include #include using namespace std; //连接状态改变时回调typedef void (__stdcall *pfnConnectChangeCallBack)(bool);//接收到数据时回调typedef void (__stdcall *pfnReceiveCallBack)(char*, const int); class CTcpClient : public ACE_Svc_Handler {public: // 是否退出的标识 long m_lStop; public: // 是否允许重连 bool m_nReconnect; // 通信超时ms int m_nCommunicateTimeOut; public: //typedef ACE_Svc_Handler Base; CTcpClient(); virtual ~CTcpClient(); public: /** * 设置连接参数. * * @param -[in] char* szHost: [主链接地址]; * @param -[in] char* szBackup: [备连接地址]; * @param -[in] int nRemotePort: [目标端口号]; * @param -[in] int nLocalPort: [本地端口号]; * @return int. * @version 10/28/2016 w Initial Version */ virtual long SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort = 0); /** * 首次连接. * * @return int. * @version 10/28/2016 w Initial Version */ virtual int Connect(); /** * 断开连接. * * @return int. * @version 10/28/2016 w Initial Version */ virtual int Reconnect(); /** * 断开连接. * * @return int. * @version 10/28/2016 w Initial Version */ virtual int Disconnect(); /** * 发送数据. * * @param -[in,out] char* szSend: [数据] * @param -[in] char* lSendSize: [大小] * @param -[in] int nCommunicateTimeOut: [超时ms] * @return int. * @version 10/28/2016 w Initial Version */ virtual long Send(const char* szSend, long lSendSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT); /** * 接收数据. * * @param -[in,out] char* szReceive: [数据] * @param -[in] long lReceiveSize: [大小] * @param -[in] int nCommunicateTimeOut: [超时ms] * @version 10/28/2016 w Initial Version */ virtual long Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut = COMMUNICATE_TIMEOUT); // virtual bool IsConnected() { return m_nIsConnected; } /** * 设置连接改变回调函数. * * @version 10/28/2016 w Initial Version */ void SetOnConnectChangeCallBack(pfnConnectChangeCallBack func); /** * 设置数据接收回调函数. * * @version 10/28/2016 w Initial Version */ void SetOnReceiveCallBack(pfnReceiveCallBack func);public: /** * 建立连接时被调用. * * @param -[in,out] char* param: [参数] * @return int. * @version 10/28/2016 w Initial Version */ int open(void* param = 0); /** * 当有输入时该函数被调用. * * @param -[in] ACE_HANDLE: [参数] * @return int. * @version 10/28/2016 w Initial Version */ int handle_input(ACE_HANDLE handle = ACE_INVALID_HANDLE); /** * 当有输出时该函数被调用. * * @param -[in] ACE_HANDLE handle: [参数] * @return int. * @version 10/28/2016 w Initial Version */ virtual int handle_output(ACE_HANDLE handle = ACE_INVALID_HANDLE); /** * 当SockHandler从ACE_Reactor中移除时该函数被调用. * * @param -[in] ACE_HANDLE handle: [参数] * @param -[in] ACE_HANDLE closeMask: [参数] * @return int * @version 10/28/2016 w Initial Version */ int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask); /** * 任务的主流程. * 1.激活事件 * * @return int. * @version 10/10/2016 w Initial Version */ int svc();protected: /** * 触发连接改变回调函数. * * @param -[in] bool nIsConnected: [是否已连接] * * @version 10/28/2016 w Initial Version */ void OnConnectChange(bool nIsConnected); /** * 触发数据接收回调函数. * * @param -[in,out] char* pszReceive: [接收的数据区] * @param -[in] const int nReceiveSize: [数据大小] * @version 10/28/2016 w Initial Version */ void OnReceive(char* pszReceive, const int nReceiveSize); protected: // 是否已经连接 bool m_nIsConnected; // 当前连接IP地址 string m_strConnectIPAddress; // 主线连接IP地址 string m_strHostIPAddress; // 备线连接IP地址 string m_strBackupIPAddress; // 远程连接端口号 unsigned short m_nRemotePort; // 本地连接端口号 unsigned short m_nLocalPort; pfnConnectChangeCallBack m_pfnOnConnectChange; pfnReceiveCallBack m_pfnOnReceive; protected: // 最后一次连接时间 time_t m_tmLastConnect; private: /** * 关闭Socket. * * @return int. * @version 10/28/2016 w Initial Version */ int CloseSocket(); };//typedef ACE_Connector CONNECTOR;#endif // !_TCPCLIENT_
.cpp
/************************************************************** * Filename: TcpClient.cpp * Copyright: Shanghai X Co., Ltd. * * Description: TcpClient源文件. * * @author: w * @version 10/28/2016 @Reviser Initial Version **************************************************************/ #include "TcpClient.h" #include#include #include #include #include using namespace std;//ctorCTcpClient::CTcpClient(){ m_lStop = true; m_nReconnect = true; m_nIsConnected = false; m_pfnOnConnectChange = NULL; m_pfnOnReceive = NULL; m_strConnectIPAddress = ""; m_strHostIPAddress = ""; m_strBackupIPAddress = ""; m_nRemotePort = 0; m_nLocalPort = 0; m_tmLastConnect = 0; }//dctorCTcpClient::~CTcpClient(){ m_lStop = true; wait(); // close(); m_pfnOnConnectChange = NULL; m_pfnOnReceive = NULL;} long CTcpClient::SetConnectParam(char* szHost, char* szBackup, int nRemotePort, int nLocalPort){ m_strConnectIPAddress = m_strHostIPAddress = szHost; m_strBackupIPAddress = szBackup; m_nRemotePort = nRemotePort; m_nLocalPort = nLocalPort; return 0;}void CTcpClient::SetOnConnectChangeCallBack(pfnConnectChangeCallBack func) { this->m_pfnOnConnectChange = func; }void CTcpClient::SetOnReceiveCallBack(pfnReceiveCallBack func) { this->m_pfnOnReceive = func; }void CTcpClient::OnConnectChange(bool nIsConnected){ m_nIsConnected = nIsConnected; if(!m_nIsConnected) Log(LOGLEVEL_ERROR, "Disconnect from(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort); else Log(LOGLEVEL_NOTICE, "Connected to(%s:%d).", m_strConnectIPAddress.c_str(), m_nRemotePort); // if(m_pfnOnConnectChange) m_pfnOnConnectChange(m_nIsConnected);}void CTcpClient::OnReceive(char* pszReceive, const int nReceiveSize){ ACE_Message_Block *pFrame = new ACE_Message_Block(nReceiveSize); memcpy(pFrame->wr_ptr(), pszReceive, nReceiveSize); pFrame->wr_ptr(nReceiveSize); this->putq(pFrame); /*if(m_pfnOnReceive) m_pfnOnReceive(pszReceive, nReceiveSize);*/ delete[] pszReceive;}int CTcpClient::Disconnect(){ CloseSocket(); OnConnectChange(false); return 0;}int CTcpClient::CloseSocket(){ //ACE_OS::shutdown(get_handle(), ACE_SHUTDOWN_BOTH); //int nRet = ACE_OS::closesocket(m_sockHandler.get_handle()); this->peer().close(); set_handle(ACE_INVALID_HANDLE); return 0;}int CTcpClient::Reconnect(){ //已连接 if (IsConnected()) return 0; //未设置重连机制 if(!m_nReconnect) { Log(LOGLEVEL_INFO, "Reconnect is disabled."); return -1; } //小于超时时间3s不能重连 time_t tmNow; time(&tmNow); if(abs(tmNow - m_tmLastConnect) <= CONNECTION_TIMEOUT) return -1; //清理Socket CloseSocket(); return Connect();} int CTcpClient::Connect(){ //与服务器建立连接 CTcpClient *pSockHandler = this; //创建连接器 ACE_Connector connector; //设置默认连接超时 ACE_Time_Value connTimeOut(CONNECTION_TIMEOUT); ACE_Synch_Options synch_option(ACE_Synch_Options::USE_TIMEOUT, connTimeOut); //远程端点 ACE_INET_Addr remoteEP(m_nRemotePort, m_strConnectIPAddress.c_str()); Log(LOGLEVEL_INFO, "Connecting to(%s:%d) ...", remoteEP.get_host_addr(), remoteEP.get_port_number()); //更新当前连接时间戳 time(&m_tmLastConnect); int nRet = 0; if (m_nLocalPort > 0) { //绑定本地固定端口号 ACE_INET_Addr localEP(m_nLocalPort); nRet = connector.connect(pSockHandler, remoteEP, synch_option, localEP); } else { //绑定本地随机端口号 nRet = connector.connect(pSockHandler, remoteEP, synch_option); } //连接失败 if(nRet == -1) { //轮询切换连接主备服务器(存在) if(!m_strBackupIPAddress.empty() && m_strBackupIPAddress.compare(m_strHostIPAddress) != 0) { m_strConnectIPAddress = m_strConnectIPAddress.compare(m_strHostIPAddress) == 0 ? m_strBackupIPAddress : m_strHostIPAddress; } OnConnectChange(false); return -1; } //启动接收事件(OneTime) if(!nRet && m_lStop) { m_lStop = false; this->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED); } OnConnectChange(true); return 0;}int CTcpClient::svc(){ //接收 while(!m_lStop) { ACE_Time_Value tvSleep; tvSleep.msec(TASK_NAP_TIME_VALUE); ACE_OS::sleep(tvSleep); ACE_Time_Value tvWaite(0, TASK_NAP_TIME_VALUE); //BLOCKED this->reactor()->handle_events(&tvWaite); } return 0; }int CTcpClient::open(void* param){ return this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK); //if (Base::open(param) == -1) //{ // Log(LOGLEVEL_ERROR, "open() Failied."); // return -1; //} //return 0;}int CTcpClient::handle_input(ACE_HANDLE){ char *szBuffer = new char[DEFAULT_BUFFER_SIZE]; //接收数据 ssize_t length = this->peer().recv(szBuffer, DEFAULT_BUFFER_SIZE); //连接断开接收失败 if(length <= 0) { delete[] szBuffer; return -1;//implicit call handle_close() clear up } OnReceive(szBuffer, length); return 0;}int CTcpClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask closeMask){ int nRet = ACE_Event_Handler::handle_close(handle, closeMask); Disconnect(); return nRet;}int CTcpClient::handle_output(ACE_HANDLE handle /* = ACE_INVALID_HANDLE */){ //调用一次 return 0;}long CTcpClient::Receive(char* szReceive, long lReceiveSize, int nCommunicateTimeOut){ //Confirmed //implicit call handle_close() clear up if(!IsConnected()) return -1; ACE_Time_Value tvTimeout(0, nCommunicateTimeOut); //return this->peer().recv((void *)szReceive, lReceiveSize, &tvTimeout); return this->peer().recv_n((void *)szReceive, lReceiveSize, &tvTimeout);} long CTcpClient::Send(const char* szSend, long lSendSize, int nCommunicateTimeOut){ //Uncertainty //implicit call handle_close() clear up if(!IsConnected()) return -1; ACE_Time_Value tvTimeout(0, nCommunicateTimeOut); ssize_t length = this->peer().send_n(szSend, lSendSize, &tvTimeout); return length;}