问题描述
我试图在已经存在的现有Spring Boot应用程序中实现具有Spring集成的TCP服务器套接字,但是我遇到了一个问题,这个问题使我发疯... 客户端正在向服务器发送消息(字节数组)并超时。而已。 我没有从服务器收到任何异常。似乎我提供了错误的端口或东西,但是在检查端口之后,我确定它是正确的端口。
这是我基于注释的配置类:
import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;
@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {
@Value("${socket.port}")
private int serverSocketPort;
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
TcpServerConnectionFactorySpec connectionFactory =
Tcp.netServer(socketPort)
.deserializer(new CustomSerializerDeserializer())
.serializer(new CustomSerializerDeserializer())
.soTcpNoDelay(true);
TcpInboundGatewaySpec inboundGateway =
Tcp.inboundGateway(connectionFactory);
return IntegrationFlows
.from(inboundGateway)
.handle(serverSocketHandler::handleMessage)
.get();
}
@Bean
public ServerSocketHandler serverSocketHandler() {
return new ServerSocketHandler();
}
}
我想在尝试发送答案之前使接收功能正常工作,所以这就是为什么要进行最小配置的原因。
下面的类应该处理从服务器套接字接收到的消息
import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
@Log4j2
public class ServerSocketHandler {
public String handleMessage(Message<?> message,MessageHeaders messageHeaders) {
log.info(message.getPayload());
// TODO implement something useful to process the incoming message here...
return message.getPayload().toString();
}
}
上面的处理程序方法从未被调用过! 我在Google上搜索了一些示例实现或教程,但没有找到对我有用的任何东西。 我已经尝试过这些网站的实现:
- https://vispud.blogspot.com/2019/03/how-to-implement-simple-echo-socket.html
- https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#note-nio
- Spring Boot TCP Client
还有更多站点...但是没有任何帮助:-(
更新1
我已经实现了一个自定义的序列化器/反序列化器:
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>,Deserializer<byte[]> {
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
return inputStream.readAllBytes();
}
@Override
public void serialize(byte[] object,OutputStream outputStream) throws IOException {
outputStream.write(object);
}
}
客户端发送消息后,将调用自定义序列化程序,但内容始终为空。我不知道为什么。...串行器需要很多时间才能从流中读取所有字节,最后它是空的。该过程一直在重复,所以我认为我偶然创建了一个无限循环...
更新2
我已经捕获了客户端和服务器套接字之间的通信: 看来我被卡在了握手中,因此没有有效载荷...
因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。
谢谢!
解决方法
您如何与此服务器通信?默认情况下,连接工厂配置为要求输入由CRLF终止(例如Telnet)。如果客户端使用其他方式指示消息结束,则必须配置其他反序列化器。
此外,您的方法签名不正确;应该是:
public String handleMessage(byte[] message,MessageHeaders messageHeaders) {
String string = new String(message);
System.out.println(string);
return string.toUpperCase();
}
这对Telnet来说对我来说很好:
$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
FOO
^]
telnet> quit
Connection closed.
这是仅适用于LF(例如netcat)的版本:
@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
return IntegrationFlows.from(Tcp.inboundGateway(
Tcp.netServer(1234)
.deserializer(TcpCodecs.lf())
.serializer(TcpCodecs.lf())))
.handle(serverSocketHandler::handleMessage)
.get();
}
$ nc localhost 1234
foo
FOO
^C
,
好吧,经过几天的分析和编码,我找到了使用Spring集成处理TCP套接字通信的最佳解决方案。对于同样遇到同样问题的其他开发人员。这是我到目前为止所做的。
此类包含-对于我来说-基于注释的TCP套接字连接配置
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;
/**
* Spring annotation based configuration
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {
public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
@Value("${socket.port}")
private int socketPort;
/**
* Reply messages are routed to the connection only if the reply contains the ip_connectionId header
* that was inserted into the original message by the connection factory.
*/
@MessagingGateway(defaultRequestChannel = "toTcp")
public interface Gateway {
void send(String message,@Header(IpHeaders.CONNECTION_ID) String connectionId);
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
serverCf.setSerializer(SERIALIZER);
serverCf.setDeserializer(SERIALIZER);
serverCf.setSoTcpNoDelay(true);
serverCf.setSoKeepAlive(true);
// serverCf.setSingleUse(true);
// final int soTimeout = 5000;
// serverCf.setSoTimeout(soTimeout);
return serverCf;
}
@Bean
public AbstractClientConnectionFactory clientCF() {
TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost",socketPort);
clientCf.setSerializer(SERIALIZER);
clientCf.setDeserializer(SERIALIZER);
clientCf.setSoTcpNoDelay(true);
clientCf.setSoKeepAlive(true);
// clientCf.setSingleUse(true);
// final int soTimeout = 5000;
// clientCf.setSoTimeout(soTimeout);
return clientCf;
}
@Bean
public TcpInboundGateway tcpInGate() {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(serverCF());
inGate.setRequestChannel(fromTcp());
inGate.setReplyChannel(toTcp());
return inGate;
}
@Bean
public TcpOutboundGateway tcpOutGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(clientCF());
outGate.setReplyChannel(toTcp());
return outGate;
}
此类包含自定义的序列化器和反序列化器
import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>,Deserializer<byte[]> {
@NotNull
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] message = new byte[0];
if (inputStream.available() > 0) {
message = inputStream.readAllBytes();
}
log.debug("Deserialized message {}",new String(message,StandardCharsets.UTF_8));
return message;
}
@Override
public void serialize(@NotNull byte[] message,OutputStream outputStream) throws IOException {
log.info("Serializing {}",StandardCharsets.UTF_8));
outputStream.write(message);
outputStream.flush();
}
}
在以下课程中,您可以实现一些商务逻辑来处理传入的...
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// TODO implement some buisiness logic here
return msg;
}
}
和外发邮件。
import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {
@ServiceActivator(inputChannel = "toTcp")
public byte[] handleMessage(byte[] msg) {
// implement some business logic here
return msg;
}
}
希望有帮助。 ;-)