问题描述
我用python创建了一个虚拟服务器,我想用rxjava在JAVA中创建一个实现它的客户端。 首先,这是python中的服务器,仅回显收到的消息:
import socket
server_socket = socket.socket(socket.AF_INET,socket.soCK_DGRAM)
server_socket.bind(('',12000))
while True:
message,address = server_socket.recvfrom(2048)
if (message):
print(message.hex())
server_socket.sendto(message,address)
所以我希望发送消息并接收回来。 对于客户,我有3个课程:
-
Appcli
-应用本身-运行应用 -
UdpObservable
-实现一个可观察的-用于读取来自阅读器的传入消息 -
UdpWriter
-将消息发送到服务器
Appcli
-
public class Appcli {
private static final int PORT_NO = 12000;
private UdpWriter udpWriter;
public void start() {
udpWriter = new UdpWriter(PORT_NO);
final Observable<DatagramPacket> udpObservable = UdpObservable.create(1024);
disposableudpData = udpObservable
.observeOn(Schedulers.io())
.subscribe(
new Consumer<DatagramPacket>() {
@Override
public void accept(DatagramPacket datagramPacket) throws Exception {
System.out.println("received datagram");
System.out.println(new String(datagramPacket.getData()));
}
},new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("udp observable Failed");
System.out.println("ERROR: " + throwable.getMessage());
}
}
);
try {
udpWriter.sendbroadcast(hexStringToByteArray("TEST MESSAGE"));
} catch (IOException e) {
e.printstacktrace();
}
// test,used for some delay...
for (int i = 0; i < 5; i++) {
try {
sleep(500);
} catch (InterruptedException e) {
e.printstacktrace();
}
}
}
}
现在UdpObservable
-
public class UdpObservable {
private static Cancellable getCancellable(final DatagramSocket udpSocket) {
return new Cancellable() {
@Override
public void cancel() throws Exception {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}
};
}
/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes,or when an error
* occurs.
* </p>
*/
public static Observable<DatagramPacket> create(final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
System.out.println("subscribed!");
final DatagramSocket udpSocket = new DatagramSocket();
emitter.setCancellable(getCancellable(udpSocket));
//noinspection InfiniteLoopStatement
while (true) {
try {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer,rcvBuffer.length);
udpSocket.receive(datagramPacket);
if (datagramPacket.getLength() == 0) {
System.out.println("Read zero bytes");
} else {
System.out.println(Arrays.toString(datagramPacket.getData()));
}
emitter.onNext(datagramPacket);
} catch (Exception e) {
emitter.onError(e);
}
}
}
}).subscribeOn(Schedulers.io());
}
}
和writer
-
public class UdpWriter {
private final int portNo;
private DatagramSocket udpSocket;
public UdpWriter(int portNo) {
this.portNo = portNo;
}
public void sendbroadcast(final byte[] data) throws IOException {
if (udpSocket == null) {
udpSocket = new DatagramSocket();
}
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
byte[] dataBytes = data;
InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
DatagramPacket datagramPacket = new DatagramPacket(dataBytes,dataBytes.length,udpSenderAddress,portNo);
udpSocket.send(datagramPacket);
}
}).subscribeOn(Schedulers.io()).subscribe();
}
}
运行应用程序时,我在客户端得到subscribed!
的输出,实际上在服务器端我得到了消息的输出(这表明该消息是在服务器端收到的。)
但是,因为在UdpObservable
中,我还想打印来自服务器(行49
)的传入数据,所以我希望看到它也可以打印,但是什么也没发生,所以不会打印。
我想确保我的客户收到消息,因此我将writer
中的send方法更改为:
public void sendbroadcast(final byte[] data) throws IOException {
if (udpSocket == null) {
udpSocket = new DatagramSocket();
}
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
byte[] dataBytes = data;
InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
DatagramPacket datagramPacket = new DatagramPacket(dataBytes,portNo);
udpSocket.send(datagramPacket);
byte[] rcvBuffer = new byte[1024];
DatagramPacket res = new DatagramPacket(rcvBuffer,rcvBuffer.length);
udpSocket.receive(res);
if (res.getLength() == 0) {
System.out.println("Read zero bytes");
} else {
System.out.println(Arrays.toString(res.getData()));
System.out.println("that was the data");
}
}
}).subscribeOn(Schedulers.io()).subscribe();
}
现在,它会打印来自服务器的传入消息。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)