套接字通道无效的流标头:00000000

问题描述

我想序列化 'Message' 对象,我可以通过 socketChannel 成功地将它作为字节数组传输。之后,我更改了对象的属性(使其可能具有更大的尺寸),然后将对象发送回客户端时出现问题。 一旦我尝试在客户端获取对象,我就会得到一个异常,它发生在我在 getResponse() 方法中反实现 Message obj 时:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

但是,不知何故,这只适用于第一个客户端(抛出异常后,与第一个客户端的连接结束),当我启动一个新客户端(不关闭服务器)时,我可以成功地来回传输对象,此外,它适用于任何新客户。

这是我的最小可调试版本:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost",5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request,person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage,Person person) {
        Message message = new Message(sMessage,person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost",5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector,SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector,serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client,message);
    }

    private void sendResponse(SocketChannel client,Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector,ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector,SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

我尝试将此对象作为字节数组发送:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command,Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",command,person,result)
                .toString();
    }
}

我在 Message obj 中包含了这个类的实例:

public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

我不知道出了什么问题,希望得到您的帮助。

UPD:我使用 'org.apache.commons:commons-lang3:3.5' 依赖项将对象序列化为字节数组

解决方法

我之前从未使用过 Java NIO 渠道,所以我不是专家。但我发现了几件事:

一般:

  • 为了调试您的代码,使用 e.printStackTrace() 而不是仅使用 System.out.println(e.getMessage()) 会很有帮助。

客户:

  • SocketChannel server 在客户端应该配置为阻塞,否则它可能会读取 0 字节,因为还没有服务器响应,这会导致您的问题。
  • 你应该总是在阅读一些东西之前调用 ByteBuffer.clear() ,而不是之后。
  • 读取后,在调用responseBuffer.position(0)之前,必须通过get(byte[])将字节缓冲区中的位置重置为0,否则会在刚刚读取的之后读取未定义的字节.
  • 您应该根据读取的字节数而不是字节缓冲区大小来调整字节数组的大小。反过来也行,但效率低下。

服务器:

  • 你应该总是在阅读一些东西之前调用 ByteBuffer.clear() ,而不是之后。
  • 读取后,在调用responseBuffer.position(0)之前,必须通过get(byte[])将字节缓冲区中的位置重置为0,否则会在刚刚读取的之后读取未定义的字节.
  • getRequest(key) 调用期间捕获异常时,您应该关闭相应的通道,否则在客户端断开连接后,服务器将无限期地尝试从中读取,向您的控制台日志发送带有错误消息的垃圾邮件。我的修改处理了这种情况,并打印了一条很好的日志消息,告知关闭了哪个客户端(远程套接字地址)。

警告:您的代码中没有任何内容处理一侧写入通道的请求或响应大于另一侧的最大 ByteBuffer 大小的情况.同样,理论上(反)序列化的 byte[] 也可能最终大于字节缓冲区。

这是我的差异,我希望你知道如何阅读差异:

Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
   public void start() throws IOException {
     try {
       server = SocketChannel.open(new InetSocketAddress("localhost",5454));
-      server.configureBlocking(false);
+      server.configureBlocking(true);
     }
     catch (IOException e) {
       System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
       getResponse();
     }
     catch (Exception e) {
-      System.out.println(e.getMessage());
+      e.printStackTrace();
+//      System.out.println(e.getMessage());
       System.err.println("Connection lost");
       System.exit(0);
     }
   }
 
   public void getResponse() throws Exception {
-    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+    responseBuffer.clear();
 
     int read = server.read(responseBuffer);
-    responseBuffer.clear();
     if (read == -1) {
-      throw new Exception();
+      throw new Exception("EOF,cannot read server response");
     }
 
-    byte[] bytes = new byte[responseBuffer.limit()];
+    byte[] bytes = new byte[read];
+    responseBuffer.position(0);
     responseBuffer.get(bytes);
 
     Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
             getRequest(key);
           }
           catch (Exception e) {
-            System.err.println(e.getMessage());
+            e.printStackTrace();
+//            System.err.println(e.getMessage());
+            SocketChannel client = (SocketChannel) key.channel();
+            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+            client.close();
           }
         }
         iter.remove();
@@ -45,15 +49,16 @@
 
   private void getRequest(SelectionKey key) throws Exception {
     SocketChannel client = (SocketChannel) key.channel();
-    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+    requestBuffer.clear();
     int read = client.read(requestBuffer);
-    requestBuffer.clear();
     if (read == -1) {
       key.cancel();
       throw new Exception("Client disconnected at: " +
         ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
     }
-    byte[] bytes = new byte[requestBuffer.limit()];
+    byte[] bytes = new byte[read];
+    requestBuffer.position(0);
     requestBuffer.get(bytes);
     Message message = SerializationUtils.deserialize(bytes);
     sendResponse(client,message);

为了完整起见,以下是我更改后的完整类:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

  private SocketChannel server;

  public void start() throws IOException {
    try {
      server = SocketChannel.open(new InetSocketAddress("localhost",5454));
      server.configureBlocking(true);
    }
    catch (IOException e) {
      System.err.println("Server isn't responding");
      System.exit(0);
    }

    Scanner scRequest = new Scanner(System.in);
    Scanner scState = new Scanner(System.in);

    System.out.println("Enter request:");
    String request = scRequest.nextLine();

    while (!request.equals("exit")) {
      try {
        // In my actual project class Person is a way different (But it's still a POJO)
        // I included it here to make sure I can get it back after sending to the server
        System.out.println("Enter a number:");
        Person person = new Person(scState.nextInt());
        sendRequest(request,person);

        System.out.println("\nEnter request:");
        request = scRequest.nextLine();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    }

    stop();
  }

  public void sendRequest(String sMessage,Person person) {
    Message message = new Message(sMessage,person);
    ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    try {
      server.write(requestBuffer);
      requestBuffer.clear();
      getResponse();
    }
    catch (Exception e) {
      e.printStackTrace();
//      System.out.println(e.getMessage());
      System.err.println("Connection lost");
      System.exit(0);
    }
  }

  public void getResponse() throws Exception {
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    responseBuffer.clear();

    int read = server.read(responseBuffer);
    if (read == -1) {
      throw new Exception("EOF,cannot read server response");
    }

    byte[] bytes = new byte[read];
    responseBuffer.position(0);
    responseBuffer.get(bytes);

    Message message = SerializationUtils.deserialize(bytes);
    System.out.println(message);
  }

  public void stop() throws IOException {
    server.close();
  }

  public static void main(String[] args) throws IOException {
    Client client = new Client();
    client.start();
  }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
  public void start() throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost",5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector,SelectionKey.OP_ACCEPT);
    System.out.println("Server started");

    while (true) {
      selector.select();
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = iter.next();
        if (key.isAcceptable()) {
          register(selector,serverSocket);
        }
        if (key.isReadable()) {
          try {
            getRequest(key);
          }
          catch (Exception e) {
            e.printStackTrace();
//            System.err.println(e.getMessage());
            SocketChannel client = (SocketChannel) key.channel();
            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
            client.close();
          }
        }
        iter.remove();
      }
    }
  }

  private void getRequest(SelectionKey key) throws Exception {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    requestBuffer.clear();
    int read = client.read(requestBuffer);
    if (read == -1) {
      key.cancel();
      throw new Exception("Client disconnected at: " +
        ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
    }
    byte[] bytes = new byte[read];
    requestBuffer.position(0);
    requestBuffer.get(bytes);
    Message message = SerializationUtils.deserialize(bytes);
    sendResponse(client,message);
  }

  private void sendResponse(SocketChannel client,Message message) throws IOException {
    message.setResult("Some result");
    ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    while (responseBuffer.hasRemaining()) {
      client.write(responseBuffer);
    }
    responseBuffer.clear();
  }

  private void register(Selector selector,ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    client.configureBlocking(false);
    client.register(selector,SelectionKey.OP_READ);
    System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
  }

  public static void main(String[] args) throws Exception {
    new Server().start();
  }
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...