IO 完成端口返回 NULL 完成密钥

问题描述

第一次使用 IO 完成端口。我遇到了一个问题,即 GetQueuedCompletionStatus 为 Completion Key 返回 Null,我用它来传递带有代码其他部分句柄的数据结构。

GetQueuedCompletionStatus 似乎触发了接收到的消息,否则就好了。

我试图只包含涉及 IO 完成端口的代码

数据结构:

    typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE,* LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA,* LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA,* LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNowN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct
{
    OVERLAPPED* Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

完成端口初始化为:

m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0);

听众:

//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,&(pSocketData->hAcceptEvent),false,100,false);

        //nothing happened,back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event,find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,pSocketData->hAcceptEvent,&NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n",WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {
                
                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = accept(pSocketData->Socket,(LPSOCKADDR)&NewSockAddr,&nLen);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n",WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld",NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection,No Delay
                const char chOpt = 1;
                int nErr = setsockopt(NewSocket,IPPROTO_TCP,TCP_NODELAY,&chOpt,sizeof(char));
                if (nErr == -1)
                {
                    wprintf(L"setsockopt() error %ld\n",WSAGetLastError());
                    break;
                }


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData,sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->MessageProcessor = pSocketData->MessageProcessor;

                //Add the new socket to the completion port,message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket,pSocketData->IOCP,(DWORD)ConnectedSocketData,0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n",WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }                

                //Set the PerIOData,will be used at completion time
                LPPER_IO_OPERATION_DATA PerIOOperationData = new PER_IO_OPERATION_DATA;
                PerIOOperationData->BufferLen = 0;
                PerIOOperationData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                WSABUF DataBuf;
                DataBuf.len = DATA_BUFSIZE;
                DataBuf.buf = PerIOOperationData->Buffer;
                PerIOOperationData->Overlapped = new OVERLAPPED;
                ZeroMemory(PerIOOperationData->Overlapped,sizeof(OVERLAPPED));

                //Kick off the first Recv request for the Socket,will be handled by the completion Queue.
                if (WSARecv(NewSocket,&DataBuf,1,&RecvBytes,&Flags,(PerIOOperationData->Overlapped),NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n",WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"UnkNown network event error %ld\n",WSAGetLastError());
                break;
            }
        }
    }
}

工作线程,当它尝试使用 ConnectedSocketData 时,由于结构为空而崩溃:

    // Worker thread,processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF DataBuf;
    DWORD RecvBytes = 0;
    int DestinationAddress = 0;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort,&BytesTransferred,(PULONG_PTR)&ConnectedSocketData,(LPOVERLAPPED*)&lpOverlapped,INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() Failed with error %d\n",Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() Failed with error %d\n",WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }

        // retrieve IO data
        PerIoData = CONTAINING_RECORD(lpOverlapped,PER_IO_OPERATION_DATA,Overlapped);

        vector<SiteData>::iterator SiteDataIterator;
        vector<InstrumentData>::iterator InstrumentDataIterator;

        for (SiteDataIterator = ConnectedSocketData->CfgHandle->SiteConnections.begin();
            SiteDataIterator != ConnectedSocketData->CfgHandle->SiteConnections.end();
            SiteDataIterator++)
        {
            if (SiteDataIterator->Port == ConnectedSocketData->Port)
            {
                break;
            }
        }

知道为什么 IOCP 没有传递完成密钥吗?

解决方法

知道为什么 IOCP 没有传递完成密钥吗?

当然它会准确地传回您传递给 CreateIoCompletionPort 的内容和指向 OVERLAPPED 的 I/O 指针

但一开始

CreateIoCompletionPort((HANDLE)NewSocket,pSocketData->IOCP,(DWORD)ConnectedSocketData,0)

是错误的 - 这里只使用了 ConnectedSocketData 的低 32 位,必须是

CreateIoCompletionPort((HANDLE)NewSocket,(DWORD_PTR)ConnectedSocketData,0)

那么,你对PER_IO_OPERATION_DATA的定义

typedef struct
{
    OVERLAPPED* Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

始终是严重错误。必须

typedef struct
{
    OVERLAPPED Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;

by Overlapped(当它定义为 OVERLAPPED* Overlapped 时)不可能收回 PER_IO_OPERATION_DATA 的地址。但是从 &Overlapped (当它定义为 OVERLAPPED Overlapped 时)已经可能了。

我最好做下一个定义

struct PER_IO_OPERATION_DATA : public OVERLAPPED
{
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
};

并使用

WSARecv(NewSocket,&DataBuf,1,&RecvBytes,&Flags,PerIOOperationData,NULL)

PerIoData = static_cast<PER_IO_OPERATION_DATA*>(lpOverlapped);

GetQueuedCompletionStatus 返回 false 时,您错误地处理了案例 - 在这种情况下您泄漏了 PerIoData

这只是您代码中的主要错误,与您的直接问题有关