BufferedReader readLine阻塞直到缓冲区已满

问题描述

我正在编写一个客户端/服务器对应用程序。服务器运行多个线程来收集数据并将其添加到BlockingQueue。套接代码在队列中循环,并将找到的所有数据发送到客户端。数据是一个字符串,我附加了一个行分隔符,以便客户端可以使用BufferedReader.readLine()读取它。

我的问题是,与其等到可用的每一行上都没有返回readLine(),它要等到整个缓冲区都满后才吐出缓冲区中的所有完整行。使用认的8K缓冲区,这意味着我以8K块为单位通过客户端获取数据,这是非常不可取的。我已经附上了代表这个的MRE代码。通过登录我的实际应用程序,我已经确认BufferedWriter正在从队列中获取可用的数据,但是老实说,我不知道延迟是在发送端之后发生的,还是真的在阅读方面。如果您运行MRE,客户端将一次显示大约170行数据。

我已经在网上搜索了几天这种现象,我可以找到一个类似问题的摘要表明,这可能与底层的InputStreamReader和/或StreamDecoder有关,但是这种现象已经开始超出我的专业知识。 (请参见this link

所以我的问题是我是否正确地实现了BufferedReader,以及如何解决所遇到的问题,以便我在没有不必要的延迟的情况下获得每条进入的行。

package serverTest;

import java.util.concurrent.BlockingQueue;

public class ServerTest {

    public static void main(String[] args) {

        int port = 54321;
        ServerSocketComms server = new ServerSocketComms(port);
        BlockingQueue<String> queue = server.getQueue();
        new Thread(server).start();
        
        ClientSocketComms client = new ClientSocketComms("localhost",port);
        new Thread(client).start();
        
        for(int i = 0; i < 1000; i++) { // should give about 10 seconds of output
            try {
                queue.put("" + i + " - All work and no play makes Jack a dull boy");
                // Slow things down enough to show what's happening
                Thread.sleep(10);
                // 48 characters should fill the 8K buffer in approximately 2 seconds
            } catch (InterruptedException e) {
                e.printstacktrace();
            }
        }
    }

}
package serverTest;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ServerSocketComms implements Runnable {

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final int port;
    
    public ServerSocketComms(int port) {
        this.port = port;
    }
    
    @Override
    public void run() {
        // Open server socket and wait for connection
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            Socket socket = serverSocket.accept();

            // Continually loop over blocking data queue until stopped
            BufferedWriter dataOut = new BufferedWriter(new OutputStreamWriter(socket.getoutputStream()));
            while(socket.isConnected()) {
                dataOut.write(queue.take());
                dataOut.newLine(); // delimit strings with a line separator
            }
            
            // Loop never exits because client socket never completes because of BufferedReader issue
            // so sockets never close and application never terminates
            socket.close();
            serverSocket.close();
        } catch (IOException | InterruptedException e) {
            e.printstacktrace();
        }
    }

    public BlockingQueue<String> getQueue() {
        // Return a reference to the sending queue to be populated by other threads
        return this.queue;
    }
}
package serverTest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.socket;

public class ClientSocketComms implements Runnable {

    private final String server;
    private final int port;
    
    public ClientSocketComms(String server,int port) {
        this.server = server;
        this.port = port;
    }
    
    @Override
    public void run() {
        // Open socket to server and wait for incoming data
        try {
            Socket socket = new Socket(server,port);
            BufferedReader dataIn = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            // Continually loop over incoming data until stopped
            String data;
            while((data = dataIn.readLine()) != null) {
                // Should print out every line as it's received,// but instead waits until buffer is full
                // (outputs about 170 lines at a time)
                System.out.println(data);
            }
            
            // Close socket and thread will die
            // (but loop never ends because buffer doesn't get completely refilled)
            socket.close();
        } catch (IOException e) {
            e.printstacktrace();
        }
    }
}

解决方法

您的服务器正在使用BufferedWriter

BufferedWriter dataOut = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));

这会执行您不喜欢的缓冲。似乎默认的缓冲区大小是您看到的8k,尽管该大小未在API中记录,并且可能会更改。如果您想要确保到目前为止存储在缓冲区中的所有数据都立即发送到客户端,请尝试使用dataOut.flush()刷新缓冲区。看看BufferedWriter API了解详情。

顺便说一句,我还没有检查您的代码中是否还有其他问题。但是以上绝对是一个。