带有datagramSockets的rxjava没有收到消息

问题描述

我用python创建了一个虚拟服务器,我想用rxjava在JAVA中创建一个实现它的客户端。 首先,这是python中的服务器,仅回显收到的消息:

import socket

server_socket = socket.socket(socket.AF_INET,socket.soCK_DGRAM)
server_socket.bind(('',12000))

while True:
    message,address = server_socket.recvfrom(2048)
    if (message):
        print(message.hex())
    server_socket.sendto(message,address)

所以我希望发送消息并接收回来。 对于客户,我有3个课程:

  1. Appcli-应用本身-运行应用
  2. UdpObservable-实现一个可观察的-用于读取来自阅读器的传入消息
  3. UdpWriter-将消息发送到服务器

Appcli-

public class Appcli {

    private static final int PORT_NO = 12000;

    private UdpWriter udpWriter;

    public void start() {
        udpWriter = new UdpWriter(PORT_NO);

        final Observable<DatagramPacket> udpObservable = UdpObservable.create(1024);

        disposableudpData = udpObservable
                .observeOn(Schedulers.io())
                .subscribe(
                        new Consumer<DatagramPacket>() {
                            @Override
                            public void accept(DatagramPacket datagramPacket) throws Exception {
                                System.out.println("received datagram");
                                System.out.println(new String(datagramPacket.getData()));
                            }
                        },new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                System.out.println("udp observable Failed");
                                System.out.println("ERROR: " + throwable.getMessage());
                            }
                        }
                );

        try {
            udpWriter.sendbroadcast(hexStringToByteArray("TEST MESSAGE"));
        } catch (IOException e) {
            e.printstacktrace();
        }

        // test,used for some delay...
        for (int i = 0; i < 5; i++) {
            try {
                sleep(500);
            } catch (InterruptedException e) {
                e.printstacktrace();
            }
        }
    }
}

现在UdpObservable-

public class UdpObservable {
    private static Cancellable getCancellable(final DatagramSocket udpSocket) {
        return new Cancellable() {
            @Override
            public void cancel() throws Exception {
                if (!udpSocket.isClosed()) {
                    udpSocket.close();
                }
            }
        };
    }

    /**
     * creates an Observable that will emit all UDP datagrams of a UDP port.
     * <p>
     * This will be an infinite stream that ends when the observer unsubscribes,or when an error
     * occurs.
     * </p>
     */
    public static Observable<DatagramPacket> create(final int bufferSizeInBytes) {
        return Observable.create(
                new ObservableOnSubscribe<DatagramPacket>() {
                    @Override
                    public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
                        System.out.println("subscribed!");
                        final DatagramSocket udpSocket = new DatagramSocket();
                        emitter.setCancellable(getCancellable(udpSocket));
                        //noinspection InfiniteLoopStatement
                        while (true) {
                            try {
                                byte[] rcvBuffer = new byte[bufferSizeInBytes];
                                DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer,rcvBuffer.length);
                                udpSocket.receive(datagramPacket);
                                if (datagramPacket.getLength() == 0) {
                                    System.out.println("Read zero bytes");
                                } else {
                                    System.out.println(Arrays.toString(datagramPacket.getData()));
                                }
                                emitter.onNext(datagramPacket);
                            } catch (Exception e) {
                                emitter.onError(e);
                            }
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }
}

writer-

public class UdpWriter {

    private final int portNo;
    private DatagramSocket udpSocket;

    public UdpWriter(int portNo) {
        this.portNo = portNo;
    }

    public void sendbroadcast(final byte[] data) throws IOException {
        if (udpSocket == null) {
            udpSocket = new DatagramSocket();
        }

        Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                byte[] dataBytes = data;
                InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
                DatagramPacket datagramPacket = new DatagramPacket(dataBytes,dataBytes.length,udpSenderAddress,portNo);
                udpSocket.send(datagramPacket);
            }
        }).subscribeOn(Schedulers.io()).subscribe();
    }

}

运行应用程序时,我在客户端得到subscribed!输出,实际上在服务器端我得到了消息的输出(这表明该消息是在服务器端收到的。) 但是,因为在UdpObservable中,我还想打印来自服务器(行49)的传入数据,所以我希望看到它也可以打印,但是什么也没发生,所以不会打印。

我想确保我的客户收到消息,因此我将writer中的send方法更改为:

public void sendbroadcast(final byte[] data) throws IOException {
    if (udpSocket == null) {
        udpSocket = new DatagramSocket();
    }

    Completable.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            byte[] dataBytes = data;
            InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
            DatagramPacket datagramPacket = new DatagramPacket(dataBytes,portNo);
            udpSocket.send(datagramPacket);
            byte[] rcvBuffer = new byte[1024];
            DatagramPacket res = new DatagramPacket(rcvBuffer,rcvBuffer.length);
            udpSocket.receive(res);
            if (res.getLength() == 0) {
                System.out.println("Read zero bytes");
            } else {
                System.out.println(Arrays.toString(res.getData()));
                System.out.println("that was the data");
            }
        }
    }).subscribeOn(Schedulers.io()).subscribe();
}

现在,它会打印来自服务器的传入消息。

所以我的问题是,为什么观察者内部的订阅方法没有混乱?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)