IO 完成端口的接收缓冲区始终为空

问题描述

好的,这与之前的帖子有关,但这是一个不同的错误,所以我提出了一个新问题。上一篇IO Completion port returning NULL Completion Key

因此在我设置的 IO 完成端口上收到接收消息,它触发 GetQueuedCompletionStatus 并返回完成键和重叠数据。两者看起来都不错,我可以看到它们的结构中填充了数据。然而,传递给 WSARecv 的 Buffer 没有填充传入的消息。 (BytesTransfered 表示收到了字节,但 WSABUF 中没有数据)。

这是目前的代码,正在寻找有关为什么没有填充缓冲区的帮助。

networkhandlerthread.ccp

#include "NetworkHandlerThread.h"

// Worker thread,processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF* DataBuf;
    DWORD RecvBytes = 0;
    Type1MessageParser Type1MsgParser;
    Type2MessageParser Type2MsgParser;
    int DestinationAddress = 0;
    bool IsType1 = false;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort,&BytesTransferred,(PULONG_PTR)&ConnectedSocketData,(LPOVERLAPPED*)&PerIoData,INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() Failed with error %d\n",Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() Failed with error %d\n",WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }


        //We have a message,determine if it's something we receaved or something we should send.
        if (PerIoData->OperationType == OPERATION_TYPE_RECV)
        {
            ///tbd process recv
            ConnectedSocketData; //this is comming in good and has data
            PerIoData->Buffer; // this is empty (pointer is good,but no data)
        }
        else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
        {
            ///tbd process send
        }
    }
};


//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,&(pSocketData->hAcceptEvent),false,100,false);

        //nothing happened,back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event,find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,pSocketData->hAcceptEvent,&NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {

                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = WSAAccept(pSocketData->Socket,(LPSOCKADDR)&NewSockAddr,&nLen,NULL,NULL);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n",WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld",NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection,No Delay
                //const char chOpt = 1;
                //int nErr = setsockopt(NewSocket,IPPROTO_TCP,TCP_NODELAY,&chOpt,sizeof(char));
                //if (nErr == -1)
                //{
                //    wprintf(L"setsockopt() error %ld\n",WSAGetLastError());
                //    break;
                //}


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData,sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;

                //Add the new socket to the completion port,message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket,pSocketData->IOCP,(DWORD_PTR)ConnectedSocketData,0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n",WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }

                //Set the PerIOData,will be used at completion time
                LPPER_IO_OPERATION_DATA PerIoData;
                PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));

                ZeroMemory(&(PerIoData->overlapped),sizeof(OVERLAPPED));
                PerIoData->BufferLen = 0;
                PerIoData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                PerIoData->Buffer.buf = PerIoData->cBuffer;
                PerIoData->Buffer.len = DATA_BUFSIZE;


                //Kick off the first Recv request for the Socket,will be handled by the completion Queue.
                if (WSARecv(NewSocket,&(PerIoData->Buffer),1,&RecvBytes,&Flags,&(PerIoData->overlapped),NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n",WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"UnkNown network event error %ld\n",WSAGetLastError());
                break;
            }
        }
    }
}


NetworkHandlerThread::NetworkHandlerThread()
{
    m_CompletionPort = 0;
    m_hListenThread = 0;
}

NetworkHandlerThread::~NetworkHandlerThread()
{

}

void NetworkHandlerThread::StartNetworkHandler()
{
    int iResult = 0;
    SYstem_INFO SystemInfo;
    unsigned int i = 0;

    //Start WSA
    iResult = WSAStartup(MAKEWORD(2,2),&wsaData);
    if (iResult != NO_ERROR) {
        wprintf(L"WSAStartup() Failed with error: %d\n",iResult);
        return;
    }

    //Start Completion Port
    m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0);
    if (m_CompletionPort != NULL)
    {
        wprintf(L"Completion Port Created\n");
    }

    //Get # of system processors
    GetSystemInfo(&SystemInfo);

    //create Worker Threads for each processor.
    for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
    {
        HANDLE ThreadHandle;

        // Create a server worker thread,and pass the
        // completion port to the thread. 
        ThreadHandle = CreateThread(NULL,ServerWorkerThread,m_CompletionPort,NULL);

        // Close the thread handle
        if (ThreadHandle != NULL)
        {
            CloseHandle(ThreadHandle);
        }
    }
}

void NetworkHandlerThread::AddListenThread(int Port,ConfigHandler* pConfigHandle,void* ForwardHandle)
{
    SOCKADDR_IN InternetAddr;
    int iResult = 0;
    LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;

    if (pListenSocketData == NULL)
    {
        return;
    }

    //Create the listener Socket
    pListenSocketData->Socket = WSASocket(AF_INET,SOCK_STREAM,WSA_FLAG_OVERLAPPED);
    if (pListenSocketData->Socket == INVALID_SOCKET)
    {
        wprintf(L"socket function Failed with error: %ld\n",WSAGetLastError());
        WSACleanup();
        return;
    }

    // Create a Event to handle Socket Accepts
    pListenSocketData->hAcceptEvent = WSACreateEvent();
    if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
    {
        wprintf(L"WSACreateEvent() error %ld\n",WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    // Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
    int nRet = WSAEventSelect(pListenSocketData->Socket,pListenSocketData->hAcceptEvent,FD_ACCEPT);
    if (nRet == SOCKET_ERROR)
    {
        wprintf(L"WSAAsyncSelect() error %ld\n",WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    //Assign the Port Number
    InternetAddr.sin_family = AF_INET;
    InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    InternetAddr.sin_port = htons(Port);
    pListenSocketData->Port = Port;
    pListenSocketData->IOCP = m_CompletionPort;
    pListenSocketData->CfgHandle = pConfigHandle;
    pListenSocketData->ForwardMessager = ForwardHandle;

    //Bind the Socket to the Port
    iResult = ::bind((pListenSocketData->Socket),(sockaddr*)&InternetAddr,sizeof(InternetAddr));
    if (iResult == SOCKET_ERROR) {
        wprintf(L"bind function Failed with error %d\n",WSAGetLastError());
        iResult = closesocket(pListenSocketData->Socket);
        if (iResult == SOCKET_ERROR)
            wprintf(L"closesocket function Failed with error %d\n",WSAGetLastError());
        WSACleanup();
        return;
    }

    //Listen for incoming connection requests.
    if (listen(pListenSocketData->Socket,SOMAXCONN) == SOCKET_ERROR)
    {
        wprintf(L"listen function Failed with error: %d\n",WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        WSACleanup();
        return;
    }

    wprintf(L"Listening on %ld",Port);

    m_hListenThread = (HANDLE)CreateThread(NULL,// Security
        0,// Stack size - use default
        ListenThread,// Thread fn entry point
        (void*)pListenSocketData,//Listen Socket Data
        0,// Init flag
        NULL);  // Thread address
}

NetworkHandlerThread.h

#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

class NetworkHandlerThread
{
public:
    WSADATA wsaData;
    HANDLE m_CompletionPort;
    HANDLE m_hListenThread;

public:
    NetworkHandlerThread();
    ~NetworkHandlerThread();

    void StartNetworkHandler();

    void AddListenThread(int Port,void* ForwardHandle);
};

线程实用程序.h

#pragma once
#include <mutex>
#include "ConfigHandler.h"


using namespace std;

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE,* LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA,* LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA,* LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNowN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct PER_IO_OPERATION_DATA
{
    OVERLAPPED overlapped;
    WSABUF Buffer;
    char cBuffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
};

#define LPPER_IO_OPERATION_DATA PER_IO_OPERATION_DATA

解决方法

因此,在长时间查看代码后犯了愚蠢的错误(在睡个好觉后立即发现),Recv 获取了十六进制数据,第一个字符是 0x00。在调试器中查看时显示为空文本字符串,进一步检查时,所有字节都在缓冲区中。

上面是一个工作 IOCP 套接字的好例子,所以我将把它留在这里供人们参考。