diff --git a/client/IOCPClient.cpp b/client/IOCPClient.cpp index 695af18..d5f38f7 100644 --- a/client/IOCPClient.cpp +++ b/client/IOCPClient.cpp @@ -1,4 +1,4 @@ -// IOCPClient.cpp: implementation of the IOCPClient class. +// IOCPClient.cpp: implementation of the IOCPClient class. // ////////////////////////////////////////////////////////////////////// #ifdef _WIN32 @@ -51,27 +51,27 @@ inline int WSAGetLastError() #ifndef _WIN32 BOOL SetKeepAliveOptions(int socket, int nKeepAliveSec = 180) { - // TCP ѡ + // 启用 TCP 保活选项 int enable = 1; if (setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable)) < 0) { Mprintf("Failed to enable TCP keep-alive\n"); return FALSE; } - // TCP_KEEPIDLE (3ӿкʼ keep-alive ) + // 设置 TCP_KEEPIDLE (3分钟空闲后开始发送 keep-alive 包) if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPIDLE, &nKeepAliveSec, sizeof(nKeepAliveSec)) < 0) { Mprintf("Failed to set TCP_KEEPIDLE\n"); return FALSE; } - // TCP_KEEPINTVL (5Լ) - int keepAliveInterval = 5; // 5 + // 设置 TCP_KEEPINTVL (5秒的重试间隔) + int keepAliveInterval = 5; // 5秒 if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPINTVL, &keepAliveInterval, sizeof(keepAliveInterval)) < 0) { Mprintf("Failed to set TCP_KEEPINTVL\n"); return FALSE; } - // TCP_KEEPCNT (5̽ΪӶϿ) + // 设置 TCP_KEEPCNT (最多5次探测包后认为连接断开) int keepAliveProbes = 5; if (setsockopt(socket, IPPROTO_TCP, TCP_KEEPCNT, &keepAliveProbes, sizeof(keepAliveProbes)) < 0) { Mprintf("Failed to set TCP_KEEPCNT\n"); @@ -169,7 +169,7 @@ IOCPClient::~IOCPClient() SAFE_DELETE(m_Encoder); } -// ȡIPַ +// 从域名获取IP地址 std::string GetIPAddress(const char *hostName) { #ifdef _WIN32 @@ -180,9 +180,9 @@ std::string GetIPAddress(const char *hostName) struct hostent *host = gethostbyname(hostName); #ifdef _DEBUG if (host == NULL) return ""; - Mprintf("IPΪ: %s.\n", host->h_addrtype == AF_INET ? "IPV4" : "IPV6"); + Mprintf("此域名的IP类型为: %s.\n", host->h_addrtype == AF_INET ? "IPV4" : "IPV6"); for (int i = 0; host->h_addr_list[i]; ++i) - Mprintf("ȡĵ%dIP: %s\n", i+1, inet_ntoa(*(struct in_addr*)host->h_addr_list[i])); + Mprintf("获取的第%d个IP: %s\n", i+1, inet_ntoa(*(struct in_addr*)host->h_addr_list[i])); #endif if (host == NULL || host->h_addr_list == NULL) return ""; @@ -205,18 +205,18 @@ std::string GetIPAddress(const char *hostName) Mprintf("IP Address: %s \n", ip); - freeaddrinfo(res); // ҪͷŵַϢ + freeaddrinfo(res); // 不要忘记释放地址信息 return ip; #endif } BOOL ConnectWithTimeout(SOCKET sock, SOCKADDR *addr, int timeout_sec=5) { - // ʱΪ + // 临时设为非阻塞 u_long mode = 1; ioctlsocket(sock, FIONBIO, &mode); - // ӣ + // 发起连接(非阻塞) int ret = connect(sock, addr, sizeof(*addr)); if (ret == SOCKET_ERROR) { int err = WSAGetLastError(); @@ -225,7 +225,7 @@ BOOL ConnectWithTimeout(SOCKET sock, SOCKADDR *addr, int timeout_sec=5) } } - // ȴдɻʧܣ + // 等待可写(代表连接完成或失败) fd_set writefds; FD_ZERO(&writefds); FD_SET(sock, &writefds); @@ -236,10 +236,10 @@ BOOL ConnectWithTimeout(SOCKET sock, SOCKADDR *addr, int timeout_sec=5) ret = select(0, NULL, &writefds, NULL, &tv); if (ret <= 0 || !FD_ISSET(sock, &writefds)) { - return FALSE; // ʱ + return FALSE; // 超时或出错 } - // Ƿɹ + // 检查连接是否真正成功 int error = 0; int len = sizeof(error); getsockopt(sock, SOL_SOCKET, SO_ERROR, (char*)&error, &len); @@ -247,7 +247,7 @@ BOOL ConnectWithTimeout(SOCKET sock, SOCKADDR *addr, int timeout_sec=5) return FALSE; } - // Ļģʽ + // 改回阻塞模式 mode = 0; ioctlsocket(sock, FIONBIO, &mode); @@ -263,7 +263,7 @@ BOOL IOCPClient::ConnectServer(const char* szServerIP, unsigned short uPort) m_masker->SetServer(m_sCurIP.c_str()); unsigned short port = m_nHostPort; - m_sClientSocket = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP); // + m_sClientSocket = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP); //传输层 if (m_sClientSocket == SOCKET_ERROR) { return FALSE; @@ -284,45 +284,57 @@ BOOL IOCPClient::ConnectServer(const char* szServerIP, unsigned short uPort) #else m_ServerAddr.sin_family = AF_INET; m_ServerAddr.sin_port = htons(port); - // szServerIPֿͷΪIPת - // ʹ inet_pton inet_addr (inet_pton ֧ IPv4 IPv6) + // 若szServerIP非数字开头,则认为是域名,需进行IP转换 + // 使用 inet_pton 替代 inet_addr (inet_pton 可以支持 IPv4 和 IPv6) if (inet_pton(AF_INET, m_sCurIP.c_str(), &m_ServerAddr.sin_addr) <= 0) { Mprintf("Invalid address or address not supported\n"); return false; } - // ׽ + // 创建套接字 if (m_sClientSocket == -1) { Mprintf("Failed to create socket\n"); return false; } - // ӵ + // 连接到服务器 if (connect(m_sClientSocket, (struct sockaddr*)&m_ServerAddr, sizeof(m_ServerAddr)) == -1) { Mprintf("Connection failed\n"); close(m_sClientSocket); - m_sClientSocket = -1; // ׽Ч + m_sClientSocket = -1; // 标记套接字无效 return false; } #endif const int chOpt = 1; // True - // Set KeepAlive , ֹ˲ + + // 启用 TCP_NODELAY 禁用 Nagle 算法,减少小包延迟 + int nodelay = 1; + setsockopt(m_sClientSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&nodelay, sizeof(nodelay)); + + // 增大发送缓冲区到 256KB + int sendBufSize = 256 * 1024; + setsockopt(m_sClientSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sendBufSize, sizeof(sendBufSize)); + + // Set KeepAlive 开启保活机制, 防止服务端产生死连接 if (setsockopt(m_sClientSocket, SOL_SOCKET, SO_KEEPALIVE, (char *)&chOpt, sizeof(chOpt)) == 0) { #ifdef _WIN32 - // óʱϸϢ + // 设置超时详细信息 tcp_keepalive klive; - klive.onoff = 1; // ñ - klive.keepalivetime = 1000 * 60 * 3; // 3ӳʱ Keep Alive - klive.keepaliveinterval = 1000 * 5; // ԼΪ5 Resend if No-Reply + klive.onoff = 1; // 启用保活 + klive.keepalivetime = 1000 * 60 * 3; // 3分钟超时 Keep Alive + klive.keepaliveinterval = 1000 * 5; // 重试间隔为5秒 Resend if No-Reply WSAIoctl(m_sClientSocket, SIO_KEEPALIVE_VALS,&klive,sizeof(tcp_keepalive), NULL, 0,(unsigned long *)&chOpt,0,NULL); #else - // ñѡ + // 设置保活选项 SetKeepAliveOptions(m_sClientSocket); #endif } + m_bConnected = TRUE; + Mprintf("连接服务端成功.\n"); + if (m_hWorkThread == NULL) { #ifdef _WIN32 m_hWorkThread = (HANDLE)__CreateThread(NULL, 0, WorkThreadProc,(LPVOID)this, 0, NULL); @@ -332,8 +344,7 @@ BOOL IOCPClient::ConnectServer(const char* szServerIP, unsigned short uPort) m_hWorkThread = (HANDLE)pthread_create(&id, nullptr, (void* (*)(void*))IOCPClient::WorkThreadProc, this); #endif } - Mprintf("ӷ˳ɹ.\n"); - m_bConnected = TRUE; + return TRUE; } @@ -345,7 +356,7 @@ DWORD WINAPI IOCPClient::WorkThreadProc(LPVOID lParam) struct timeval tm = { 2, 0 }; CBuffer m_CompressedBuffer; - while (This->IsRunning()) { // û˳һֱѭ + while (This->IsRunning()) { // 没有退出,就一直陷在这个循环中 if(!This->IsConnected()) { Sleep(50); continue; @@ -361,7 +372,7 @@ DWORD WINAPI IOCPClient::WorkThreadProc(LPVOID lParam) if (iRet == 0) Sleep(50); else { Mprintf("[select] return %d, GetLastError= %d. \n", iRet, WSAGetLastError()); - This->Disconnect(); //մ + This->Disconnect(); //接收错误处理 m_CompressedBuffer.ClearBuffer(); if(This->m_exit_while_disconnect) break; @@ -387,21 +398,21 @@ bool IOCPClient::ProcessRecvData(CBuffer *m_CompressedBuffer, char *szBuffer, in if (iReceivedLength <= 0) { int a = WSAGetLastError(); Mprintf("[recv] return %d, GetLastError= %d. \n", iReceivedLength, a); - Disconnect(); //մ + Disconnect(); //接收错误处理 m_CompressedBuffer->ClearBuffer(); if (m_exit_while_disconnect) return false; } else { szBuffer[iReceivedLength] = 0; - //ȷվ͵OnRead,תOnRead + //正确接收就调用OnRead处理,转到OnRead OnServerReceiving(m_CompressedBuffer, szBuffer, iReceivedLength); } return true; } -// 쳣ݴ߼: -// f ִʱ ûдϵͳ쳣ʳͻ 0 -// f ִй ׳쳣ָʣ __except 񣬷쳣루 0xC0000005 ʾΥ棩 +// 带异常处理的数据处理逻辑: +// 如果 f 执行时 没有触发系统异常(如访问冲突),返回 0 +// 如果 f 执行过程中 抛出了异常(比如空指针访问),将被 __except 捕获,返回异常码(如 0xC0000005 表示访问违规) int DataProcessWithSEH(DataProcessCB f, void* manager, LPBYTE data, ULONG len) { __try { @@ -417,12 +428,12 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, { try { assert (ulLength > 0); - //½ӵݽнѹ + //以下接到数据进行解压缩 m_CompressedBuffer->WriteBuffer((LPBYTE)szBuffer, ulLength); int FLAG_LENGTH = m_Encoder->GetFlagLen(); int HDR_LENGTH = m_Encoder->GetHeadLen(); - //ǷͷС ǾͲȷ + //检测数据是否大于数据头大小 如果不是那就不是正确的数据 while (m_CompressedBuffer->GetBufferLength() > HDR_LENGTH) { // UnMask char* src = (char*)m_CompressedBuffer->GetBuffer(); @@ -437,7 +448,7 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, char szPacketFlag[32] = {0}; src = (char*)m_CompressedBuffer->GetBuffer(); CopyMemory(szPacketFlag, src, FLAG_LENGTH); - //жͷ + //判断数据头 HeaderEncType encType = HeaderEncUnknown; FlagType flagType = CheckHead(szPacketFlag, encType); if (flagType == FLAG_UNKNOWN) { @@ -450,12 +461,12 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, ULONG ulPackTotalLength = 0; CopyMemory(&ulPackTotalLength, m_CompressedBuffer->GetBuffer(FLAG_LENGTH), sizeof(ULONG)); - //--- ݵĴСȷж + //--- 数据的大小正确判断 ULONG len = m_CompressedBuffer->GetBufferLength(); if (ulPackTotalLength && len >= ulPackTotalLength) { ULONG ulOriginalLength = 0; - m_CompressedBuffer->ReadBuffer((PBYTE)szPacketFlag, FLAG_LENGTH);//ȡͷ shine + m_CompressedBuffer->ReadBuffer((PBYTE)szPacketFlag, FLAG_LENGTH);//读取各种头部 shine m_CompressedBuffer->ReadBuffer((PBYTE) &ulPackTotalLength, sizeof(ULONG)); m_CompressedBuffer->ReadBuffer((PBYTE) &ulOriginalLength, sizeof(ULONG)); @@ -469,9 +480,9 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, m_Encoder->Decode(CompressedBuffer, ulCompressedLength, (LPBYTE)szPacketFlag); size_t iRet = uncompress(DeCompressedBuffer, &ulOriginalLength, CompressedBuffer, ulCompressedLength); - if (Z_SUCCESS(iRet)) { //ѹɹ - //ѹõݺͳȴݸManagerд ע˶̬ - //m_pManagerе಻һɵõOnReceiveһ + if (Z_SUCCESS(iRet)) { //如果解压成功 + //解压好的数据和长度传递给对象Manager进行处理 注意这里是用了多态 + //由于m_pManager中的子类不一样造成调用的OnReceive函数不一样 int ret = DataProcessWithSEH(m_DataProcess, m_Manager, DeCompressedBuffer, ulOriginalLength); if (ret) { Mprintf("[ERROR] DataProcessWithSEH return exception code: [0x%08X]\n", ret); @@ -494,17 +505,17 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, } -// serverݣѹȽϺʱ -// رѹʱSendWithSplitȽϺʱ +// 向server发送数据,压缩操作比较耗时。 +// 关闭压缩开关时,SendWithSplit比较耗时。 BOOL IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength, PkgMask* mask) //Hello { AUTO_TICK(40); assert (ulOriginalLength > 0); { int cmd = BYTE(szBuffer[0]); - //1.001Ҳѹռõڴռԭһ +12 - //ֹ// HelloWorld 10 22 - //ѹ ѹ㷨 ΢ṩ + //乘以1.001是以最坏的也就是数据压缩后占用的内存空间和原先一样 +12 + //防止缓冲区溢出// HelloWorld 10 22 + //数据压缩 压缩算法 微软提供 //nSize = 436 //destLen = 448 #if USING_ZLIB @@ -540,7 +551,7 @@ BOOL IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength, P if (CompressedBuffer != buf) delete [] CompressedBuffer; STOP_TICK; - // ֿ鷢 + // 分块发送 return SendWithSplit((char*)m_WriteBuffer.GetBuffer(), m_WriteBuffer.GetBufferLength(), MAX_SEND_BUFFER, cmd, mask); } } @@ -560,16 +571,26 @@ BOOL IOCPClient::SendWithSplit(const char* src, ULONG srcSize, ULONG ulSplitLeng Mprintf("SendWithSplit: %d bytes large packet may causes issues.\n", srcSize); } bool isFail = false; - int iReturn = 0; //˶ + int iReturn = 0; //真正发送了多少 const char* Travel = szBuffer; int i = 0; int ulSended = 0; const int ulSendRetry = 15; - // η - for (i = ulLength; i >= ulSplitLength; i -= ulSplitLength) { + + // 大包优化:当数据量超过阈值时,尝试一次性发送更大的块 + // SO_SNDBUF 已设为 256KB,可以尝试一次发送更多数据 + const ULONG LARGE_PACKET_THRESHOLD = 256 * 1024; // 256KB + ULONG actualSplitLength = ulSplitLength; + if (ulLength >= LARGE_PACKET_THRESHOLD) { + // 大包使用更大的分块,减少系统调用次数 + actualSplitLength = 256 * 1024; // 一次发送256KB + } + + // 依次发送 + for (i = ulLength; i >= (int)actualSplitLength; i -= actualSplitLength) { int j = 0; for (; j < ulSendRetry; ++j) { - iReturn = SendTo(Travel, ulSplitLength, 0); + iReturn = SendTo(Travel, actualSplitLength, 0); if (iReturn > 0) { break; } @@ -580,9 +601,9 @@ BOOL IOCPClient::SendWithSplit(const char* src, ULONG srcSize, ULONG ulSplitLeng } ulSended += iReturn; - Travel += ulSplitLength; + Travel += actualSplitLength; } - // IJ + // 发送最后的部分 if (!isFail && i>0) { //1024 int j = 0; for (; j < ulSendRetry; j++) { diff --git a/client/IOCPClient.h b/client/IOCPClient.h index fae2020..70bc921 100644 --- a/client/IOCPClient.h +++ b/client/IOCPClient.h @@ -1,4 +1,4 @@ -// IOCPClient.h: interface for the IOCPClient class. +// IOCPClient.h: interface for the IOCPClient class. // ////////////////////////////////////////////////////////////////////// @@ -22,7 +22,7 @@ #include "IOCPBase.h" #define MAX_RECV_BUFFER 1024*32 -#define MAX_SEND_BUFFER 1024*32 +#define MAX_SEND_BUFFER 1024*128 // 增大分块大小以提高发送效率 enum { S_STOP = 0, S_RUN, S_END }; @@ -115,7 +115,7 @@ public: Mprintf("IOCPManager DataProcess on NULL ptr: %d\n", unsigned(szBuffer[0])); return FALSE; } - // ȴ׼ܴ, 1㹻 + // 等待子类准备就绪才能处理数据, 1秒足够了 int i = 0; for (; i < 1000 && !m_Manager->IsReady(); ++i) Sleep(1); @@ -212,11 +212,11 @@ public: protected: virtual int ReceiveData(char* buffer, int bufSize, int flags) { - // TCP汾 recv + // TCP版本调用 recv return recv(m_sClientSocket, buffer, bufSize - 1, 0); } virtual bool ProcessRecvData(CBuffer* m_CompressedBuffer, char* szBuffer, int len, int flag); - virtual VOID Disconnect(); // ֧ TCP/UDP + virtual VOID Disconnect(); // 函数支持 TCP/UDP virtual int SendTo(const char* buf, int len, int flags) { return ::send(m_sClientSocket, buf, len, flags); @@ -236,14 +236,14 @@ protected: CLock m_Locker; #if USING_CTX - ZSTD_CCtx* m_Cctx; // ѹ - ZSTD_DCtx* m_Dctx; // ѹ + ZSTD_CCtx* m_Cctx; // 压缩上下文 + ZSTD_DCtx* m_Dctx; // 解压上下文 #endif - const State& g_bExit; // ȫ״̬ - void* m_Manager; // û - DataProcessCB m_DataProcess; // û - ProtocolEncoder* m_Encoder; // + const State& g_bExit; // 全局状态量 + void* m_Manager; // 用户数据 + DataProcessCB m_DataProcess; // 处理用户数据 + ProtocolEncoder* m_Encoder; // 加密 DomainPool m_Domain; std::string m_sCurIP; int m_nHostPort; diff --git a/client/IOCPUDPClient.cpp b/client/IOCPUDPClient.cpp index 6910282..9747cd6 100644 --- a/client/IOCPUDPClient.cpp +++ b/client/IOCPUDPClient.cpp @@ -33,6 +33,8 @@ BOOL IOCPUDPClient::ConnectServer(const char* szServerIP, unsigned short uPort) #endif // UDP connect()Ҳ TCP keep-alive ѡ + Mprintf("UDP client socket created and ready to send.\n"); + m_bConnected = TRUE; // ̣߳Ҫ if (m_hWorkThread == NULL) { @@ -45,8 +47,6 @@ BOOL IOCPUDPClient::ConnectServer(const char* szServerIP, unsigned short uPort) #endif } - Mprintf("UDP client socket created and ready to send.\n"); - m_bConnected = TRUE; return TRUE; }