Spring集成TCP Server的5个以上的多个连接

问题描述

我现在正在使用以下版本的Spring Boot和Spring集成。

spring.boot.version 2.3.4.RELEASE
spring-integration  5.3.2.RELEASE

我的要求是创建一个TCP客户端服务器通信,而我正在使用spring集成。对于客户端和服务器之间的单个通信,该峰值正常工作,对于同时进行的5个并发客户端连接,该峰值也正常工作。

当我将并发客户端连接数从5增加到任意数字时,它不起作用,但是TCP服务器仅接受5个连接。

我已经使用了 @Gary Russell 中提到的“ ThreadAffinityClientConnectionFactory”(在类似的要求中),但是仍然无法正常工作。

下面是我目前的代码。

@Slf4j
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class SocketConfig {

    @Value("${socket.host}")
    private String clientSocketHost;

    @Value("${socket.port}")
    private Integer clientSocketPort;

    @Bean
    public TcpOutboundGateway tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway gate = new TcpOutboundGateway();
        //connectionFactory.setTaskExecutor(taskExecutor());
        gate.setConnectionFactory(clientCF());
        return gate;
    }

    @Bean
    public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory)  {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannel(fromTcp());
        return inGate;
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    // Outgoing requests
    @Bean
    public ThreadAffinityClientConnectionFactory clientCF() {
        TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(clientSocketHost,serverCF().getPort());
        tcpNetClientConnectionFactory.setSingleUse(true);
        ThreadAffinityClientConnectionFactory threadAffinityClientConnectionFactory = new ThreadAffinityClientConnectionFactory(
            tcpNetClientConnectionFactory);
        // Tested with the below too.
        // threadAffinityClientConnectionFactory.setTaskExecutor(taskExecutor());
        return threadAffinityClientConnectionFactory;
    }


    // Incoming requests
    @Bean
    public AbstractServerConnectionFactory serverCF() {
        log.info("Server Connection Factory");
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(clientSocketPort);
        tcpNetServerConnectionFactory.setSerializer(new CustomSerializer());
        tcpNetServerConnectionFactory.setDeserializer(new CustomDeserializer());
        tcpNetServerConnectionFactory.setSingleUse(true);
        return tcpNetServerConnectionFactory;
    }


    @Bean
    public TaskExecutor taskExecutor () {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(50);
        executor.setAllowCoreThreadTimeOut(true);
        executor.setKeepAliveSeconds(120);
        return executor;
    }

}

有人有多个并发的Tcp客户端连接超过5个的问题吗?

谢谢

客户代码:

@Component
@Slf4j
@RequiredArgsConstructor
public class ScheduledTaskService {

    // Timeout in milliseconds
    private static final int SOCKET_TIME_OUT = 18000;
    private static final int BUFFER_SIZE = 32000;
    private static final int ETX = 0x03;
    private static final String HEADER = "ABCDEF             ";
    private static final String data = "FIXED DARATA"
    private final AtomicInteger atomicInteger = new AtomicInteger();

    @Async
    @Scheduled(fixedDelay = 100000)
    public void sendDataMessage() throws IOException,InterruptedException {
        int numberOfRequests = 10;

        Callable<String> executeMultipleSuccessfulRequestTask = () -> socketSendNReceive();

        final Collection<Callable<String>> callables = new ArrayList<>();
        IntStream.rangeClosed(1,numberOfRequests).forEach(i-> {
            callables.add(executeMultipleSuccessfulRequestTask);
        });
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfRequests);

        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        List<String> strings = taskFutureList.stream().map(future -> {
            try {
                return future.get(20000,TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "";
        }).collect(Collectors.toList());

        strings.forEach(string -> log.info("Message received from the server: {} ",string));

    }

    public String socketSendNReceive() throws IOException{
        int requestCounter = atomicInteger.incrementAndGet();

        String host = "localhost";
        int port = 8000;

        Socket socket = new Socket();
        InetSocketAddress address = new InetSocketAddress(host,port);
        socket.connect(address,SOCKET_TIME_OUT);
        socket.setSoTimeout(SOCKET_TIME_OUT);

        //Send the message to the server
        OutputStream os = socket.getOutputStream();
        BufferedOutputStream bos = new BufferedOutputStream(os);

        bos.write(HEADER.getBytes());
        bos.write(data.getBytes());
        bos.write(ETX);
        bos.flush();
//        log.info("Message sent to the server : {} ",envio);

        //Get the return message from the server
        InputStream is = socket.getInputStream();
        String response =  receber(is);
        log.info("Received response");
        return response;
    }

    private String receber(InputStream in) throws IOException {
        final StringBuffer stringBuffer = new StringBuffer();
        int readLength;
        byte[] buffer;
        buffer = new byte[BUFFER_SIZE];
        do {
            if(Objects.nonNull(in)) {
                log.info("Input Stream not null");
            }
            readLength = in.read(buffer);
            log.info("readLength : {}  ",readLength);
            if(readLength > 0){
                stringBuffer.append(new String(buffer),readLength);
                log.info("String ******");
            }
        } while (buffer[readLength-1] != ETX);
        buffer = null;
        stringBuffer.deleteCharAt(resposta.length()-1);
        return stringBuffer.toString();
    }
}

解决方法

由于要同时打开所有连接,因此需要增加服务器连接工厂上的backlog属性。

默认为5。

/**
 * The number of sockets in the connection backlog. Default 5;
 * increase if you expect high connection rates.
 * @param backlog The backlog to set.
 */
public void setBacklog(int backlog) {

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...