C ++套接字上的异步I / O

问题描述

我正在编写一个多线程套接字应用程序(用于Windows),但是,在连接多个客户端时遇到问题。我可以将消息从服​​务器发送到客户端,但是,我只能将消息从一个客户端发送到服务器。其他客户端无法将消息发送到服务器。我在Google上搜索了一些东西,发现重叠/异步I / O是必经之路。只是有一个问题,我不知道如何实现,所以我基本上是在问我将如何去做或者这种方法是否错误

我想让RecieveFromClients()函数成为异步函数

谢谢。

这是我的代码

main.cpp

#include "server.h"

int main()
{
    Server server;

    server.StartServer();

    return 0;
}

Server.h

#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iphlpapi.h>
#include <stdio.h>

#include <string>
#include <iostream>
#include <vector>
#include <thread>

#pragma comment(lib,"Ws2_32.lib")

#define DEFAULT_PORT 4566
#define BACKLOG 10

class Server
{
public:
    Server();
    ~Server();
    int StartServer();
private:
    int Socket();
    int Bind();
    int Listen();
    void AcceptConnections();
    int StopServer();
    int CloseClientSocket(int client_num);

    void GetClients(int server_socket,std::vector<int>* clients,std::vector<sockaddr_in> *client_addrs);

    void SendToClients();
    void RecieveFromClients(int id);
private:
    sockaddr_in server_addr;
    int connected_clients,counted_clients;
    int server_socket;
    int result;
    int msgSize;
    std::vector<int> clients;
    std::vector<sockaddr_in> client_addrs;
private:
    std::thread get_clients;
    std::thread send_messages;
    std::thread recieve_messages;
};

Server.cpp

#include "server.h"

Server::Server()
    :
    connected_clients(0),counted_clients(0),server_socket(0),result(0)
{
    ZeroMemory(&server_addr,0);

    WSAData wsaData;
    if (WSAStartup(MAKEWORD(2,2),&wsaData) != 0)
    {
        printf("wsastartip falied\n");
    }
}

Server::~Server()
{
    WSACleanup();
}

int Server::StartServer()
{
    if (Socket() != 0)
    {
        return 1;
    }

    if (Bind() != 0)
    {
        return 1;
    }

    if (Listen() != 0)
    {
        return 1;
    }

    AcceptConnections();

    return 0;
}

int Server::Socket()
{
    server_socket = socket(AF_INET,SOCK_STREAM,0);
    if (server_socket == INVALID_SOCKET)
    {
        std::cout << "Failed to create socket" << std::endl;
        return 1;
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(DEFAULT_PORT);
    server_addr.sin_addr.S_un.S_addr = INADDR_ANY;

    return 0;
}

int Server::Bind()
{
    result = bind(server_socket,(sockaddr*)&server_addr,sizeof(server_addr));
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to bind socket" << std::endl;
        return 1;
    }

    return 0;
}

int Server::Listen()
{
    result = listen(server_socket,BACKLOG);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Listening Failed" << std::endl;
        return 1;
    }

    return 0;
}

void Server::AcceptConnections()
{
    get_clients = std::thread(&Server:: GetClients,this,server_socket,&clients,&client_addrs);
    send_messages = std::thread(&Server::SendToClients,this);
    recieve_messages = std::thread(&Server::RecieveFromClients,counted_clients);
    get_clients.join();
    send_messages.join();
    recieve_messages.join();
}

int Server::StopServer()
{
    std::terminate();

    for (int client : clients)
    {
        result = closesocket(client);
        if (result == SOCKET_ERROR)
        {
            std::cout << "Failed to close client socket" << std::endl;
            return 1;
        }
    }

    return 0;
}

int Server::CloseClientSocket(int client_num)
{
    result = closesocket(clients[client_num]);
    if (result == SOCKET_ERROR)
    {
        std::cout << "Failed to close client socket" << std::endl;
        return 1;
    }
    return 0;
}

void Server::GetClients(int server_socket,std::vector<sockaddr_in>* client_addrs)
{
    
    while(true)
    {
        sockaddr_in client_addr = { 0 };
        socklen_t client_addrstrlen = sizeof(client_addr);
        int client;

        client = accept(server_socket,(sockaddr*)&client_addr,&client_addrstrlen);

        clients->push_back(client);
        client_addrs->push_back(client_addr);
        ++connected_clients;

        char ip[INET_ADDRSTRLEN] = "";
        char port[100] = "";
        inet_ntop(AF_INET,&client_addr.sin_addr,ip,sizeof(ip));
        std::cout << "Client connected from " << ip << ":" << 
            client_addr.sin_port << std::endl;
    }
}

void Server::SendToClients()
{
    std::string msg;
    do
    {
        msg.clear();
        getline(std::cin,msg);
        if (msg.size() > 255)
        {
            std::cout << "Message must be less than 256 bytes"
                << std::endl;
            continue;
        }

        for (int client : clients)
        {
            int size;
            size = send(client,msg.data(),msg.size(),0);
            if (size == SOCKET_ERROR)
            {
                std::cout << "Failed to send message to client"
                    << std::endl;
            }
        }
    } while (msg != "exit");

    if (StopServer() != 0)
    {
        std::cout << "Failed to close client sockets" << std::endl;
    }
}

void Server::RecieveFromClients(int id)
{
    std::vector<char> msgBuffer(256);
    do
    {
        
        if (connected_clients > 0)
        {
            msgBuffer.clear();
            msgBuffer.resize(256);
            char ip[INET_ADDRSTRLEN];
            inet_ntop(AF_INET,&client_addrs[id].sin_addr,sizeof(ip));

            if (msgSize = recv(clients[id],msgBuffer.data(),msgBuffer.size(),0) > 0)
            {
                std::cout << ip << ": ";
                for (char c : msgBuffer)
                {
                    if (c != 0)
                    {
                        std::cout << c;
                    }
                }
                std::cout << std::endl;
            }
            else
            {
                if (msgSize == SOCKET_ERROR)
                {
                    std::cout << "Failed to recieve data" << std::endl;
                    break;
                }
                else if (clients[id] > 0)
                {
                    std::cout << "Client " << ip << " has disconnected" << std::endl;
                    CloseClientSocket(0);
                    break;
                }
            }
        }
        else
        {
            continue;
        }
    } while (true);
}

解决方法

使用重叠的I / O将对您的代码设计造成相当大的变化。但幸运的是,有一个更简单的解决方案。在您的 RecieveFromClients()方法中,您可以使用select()来确定哪些客户端套接字实际上具有待读取的数据,然后再尝试从中读取。您正在使用阻塞套接字,因此对recv()的调用将阻塞调用线程,直到接收到数据为止,因此,在您准备好实际读取内容之前,您不希望执行阻塞读取。

此外,由于您没有为每个接受的客户端创建新线程,因此正在使用id的{​​{1}}参数和RecieveFromClients()的{​​{1}}参数错误地将其完全移除。接收功能应在连接的客户端列表上运行一个循环。并且close函数应采用特定的套接字句柄关闭。

话虽这么说,代码的另一个主要设计问题是跨多个线程共享变量和容器,但没有同步访问它们中的任何一个。从长远来看,这将给您带来大麻烦。