如何在Spring Boot中创建有效的TCP Server套接字以及如何处理传入消息?

问题描述

我试图在已经存在的现有Spring Boot应用程序中实现具有Spring集成的TCP服务器套接字,但是我遇到了一个问题,这个问题使我发疯... 客户端正在向服务器发送消息(字节数组)并超时。而已。 我没有从服务器收到任何异常。似乎我提供了错误的端口或东西,但是在检查端口之后,我确定它是正确的端口。

这是我基于注释的配置类:

import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;

@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {

    @Value("${socket.port}")
    private int serverSocketPort;

    @Bean
    public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
        TcpServerConnectionFactorySpec connectionFactory = 
            Tcp.netServer(socketPort) 
              .deserializer(new CustomSerializerDeserializer())
              .serializer(new CustomSerializerDeserializer())
              .soTcpNoDelay(true);

        TcpInboundGatewaySpec inboundGateway = 
           Tcp.inboundGateway(connectionFactory);

        return IntegrationFlows
         .from(inboundGateway)
         .handle(serverSocketHandler::handleMessage)
         .get();
    }

    @Bean
    public ServerSocketHandler serverSocketHandler() {
        return new ServerSocketHandler();
    }
}

我想在尝试发送答案之前使接收功能正常工作,所以这就是为什么要进行最小配置的原因。

下面的类应该处理从服务器套接字接收到的消息

import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

@Log4j2
public class ServerSocketHandler {

    public String handleMessage(Message<?> message,MessageHeaders messageHeaders) {
        log.info(message.getPayload());
        // TODO implement something useful to process the incoming message here...
        return message.getPayload().toString();
    } 
}

上面的处理程序方法从未被调用过! 我在Google上搜索了一些示例实现或教程,但没有找到对我有用的任何东西。 我已经尝试过这些网站的实现:

  1. https://vispud.blogspot.com/2019/03/how-to-implement-simple-echo-socket.html
  2. https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#note-nio
  3. Spring Boot TCP Client

还有更多站点...但是没有任何帮助:-(

更新1

我已经实现了一个自定义的序列化器/反序列化器:

import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>,Deserializer<byte[]> {


@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
    return inputStream.readAllBytes();
}

@Override
public void serialize(byte[] object,OutputStream outputStream) throws IOException {
    outputStream.write(object);
}
}

客户端发送消息后,将调用自定义序列化程序,但内容始终为空。我不知道为什么。...串行器需要很多时间才能从流中读取所有字节,最后它是空的。该过程一直在重复,所以我认为我偶然创建了一个无限循环...

更新2

我已经捕获了客户端和服务器套接字之间的通信: 看来我被卡在了握手中,因此没有有效载荷...

Capture communication between client and server socket

因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。

谢谢!

解决方法

您如何与此服务器通信?默认情况下,连接工厂配置为要求输入由CRLF终止(例如Telnet)。如果客户端使用其他方式指示消息结束,则必须配置其他反序列化器。

此外,您的方法签名不正确;应该是:

public String handleMessage(byte[] message,MessageHeaders messageHeaders) {
    String string = new String(message);
    System.out.println(string);
    return string.toUpperCase();
}

这对Telnet来说对我来说很好:

$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
FOO
^]
telnet> quit
Connection closed.

这是仅适用于LF(例如netcat)的版本:

@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
    return IntegrationFlows.from(Tcp.inboundGateway(
            Tcp.netServer(1234)
                .deserializer(TcpCodecs.lf())
                .serializer(TcpCodecs.lf())))
            .handle(serverSocketHandler::handleMessage)
            .get();
}
$ nc localhost 1234
foo
FOO
^C
,

好吧,经过几天的分析和编码,我找到了使用Spring集成处理TCP套接字通信的最佳解决方案。对于同样遇到同样问题的其他开发人员。这是我到目前为止所做的。

此类包含-对于我来说-基于注释的TCP套接字连接配置

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;

/**
 * Spring annotation based configuration
 */
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {

    public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
    @Value("${socket.port}")
    private int socketPort;

    /**
     * Reply messages are routed to the connection only if the reply contains the ip_connectionId header
     * that was inserted into the original message by the connection factory.
     */
    @MessagingGateway(defaultRequestChannel = "toTcp")
    public interface Gateway {
        void send(String message,@Header(IpHeaders.CONNECTION_ID) String connectionId);
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel toTcp() {
        return new DirectChannel();
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() {
        TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
        serverCf.setSerializer(SERIALIZER);
        serverCf.setDeserializer(SERIALIZER);
        serverCf.setSoTcpNoDelay(true);
        serverCf.setSoKeepAlive(true);
        // serverCf.setSingleUse(true);
        // final int soTimeout = 5000;
        // serverCf.setSoTimeout(soTimeout);
        return serverCf;
    }

    @Bean
    public AbstractClientConnectionFactory clientCF() {

        TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost",socketPort);
        clientCf.setSerializer(SERIALIZER);
        clientCf.setDeserializer(SERIALIZER);
        clientCf.setSoTcpNoDelay(true);
        clientCf.setSoKeepAlive(true);
        // clientCf.setSingleUse(true);
        // final int soTimeout = 5000;
        // clientCf.setSoTimeout(soTimeout);
        return clientCf;
    }

    @Bean
    public TcpInboundGateway tcpInGate() {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(serverCF());
        inGate.setRequestChannel(fromTcp());
        inGate.setReplyChannel(toTcp());
        return inGate;
    }

    @Bean
    public TcpOutboundGateway tcpOutGate() {
        TcpOutboundGateway outGate = new TcpOutboundGateway();
        outGate.setConnectionFactory(clientCF());
        outGate.setReplyChannel(toTcp());
        return outGate;
    }

此类包含自定义的序列化器和反序列化器

import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import java.nio.charset.StandardCharsets;

/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>,Deserializer<byte[]> {

    @NotNull
    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        byte[] message = new byte[0];
        if (inputStream.available() > 0) {
            message = inputStream.readAllBytes();
        }
        log.debug("Deserialized message {}",new String(message,StandardCharsets.UTF_8));
        return message;
    }

    @Override
    public void serialize(@NotNull byte[] message,OutputStream outputStream) throws IOException {
        log.info("Serializing {}",StandardCharsets.UTF_8));
        outputStream.write(message);
        outputStream.flush();
    }
}

在以下课程中,您可以实现一些商务逻辑来处理传入的...

import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {

    @ServiceActivator(inputChannel = "toTcp")
    public byte[] handleMessage(byte[] msg) {
        // TODO implement some buisiness logic here
        return msg;
    }
}

和外发邮件。

import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {

    @ServiceActivator(inputChannel = "toTcp")
    public byte[] handleMessage(byte[] msg) {
        // implement some business logic here
        return msg;
    }
}

希望有帮助。 ;-)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...