问题描述
Netty-Reactor刚刚开始,并且想知道是否有人提供有关如何使用UdpServer加入多播组并处理传入数据包的示例。我还想使用UdpClient通过网络发送消息。
当前文档对示例的内容很少。
我很困惑我需要写多少Netty自定义对象与Netty-Reactor项目对我隐藏的内容。我看到有UdpInbound和UdpOutbound ..这些与通道编码器和解码器有什么关系?我还需要那些吗?
我想举一个UDP服务器侦听多播组并侦听发送到该组的Google Protobuf消息的示例。我知道如何从头完成所有protobuf处理。.
我只是不确定如何使用Netty-Reactor在线进行实际的收听。
我看到有一个UdpInbound.join(),它看起来像我想要的..但那之后我迷路了。
有人可以给我指出例子吗?
谢谢
更新:
@Violeta-感谢您的帮助和链接!它为我带来了一点点帮助。
我认为我发现了一个错误?也许我误解了如何使IPv4和IPv6在同一服务器上工作。我在您的示例代码中注意到您使用了.runOn(resources,InternetProtocolFamily.IPv4)
。
没有此代码,当我的InetInterface既分配了IPv4地址又分配了IPv6地址时,绑定将失败。
由于该代码将在多宿主计算机上运行,因此我想在同一服务器和所有接口上同时支持IPv4 / IPv6。
除了使用InternetProtocolFamily
方法外,我看不到在连接上指定.runOn
的方法。
我收到以下错误:
IFace: name:en0 (en0) Address class: /fe80:0:0:0:1ce8:a828:3c15:4f4b%en0 is Inet4Address = false
IFace: name:en0 (en0) Address class: /192.168.1.151 is Inet4Address = true
09:42:35.615 [udp-nio-1] INFO server.Application - Joining iFace name:en0 (en0) to Multicast Group /224.0.0.224
09:42:35.621 [udp-nio-1] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.IllegalArgumentException: IPv6 socket cannot join IPv4 multicast group)
09:42:35.623 [udp-nio-1] ERROR reactor.Flux.ConcatArray.1 -
java.lang.IllegalArgumentException: IPv6 socket cannot join IPv4 multicast group
at sun.nio.ch.DatagramChannelImpl.innerJoin(DatagramChannelImpl.java:814)
at sun.nio.ch.DatagramChannelImpl.join(DatagramChannelImpl.java:900)
at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:414)
at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:391)
at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:384)
at reactor.netty.udp.UdpOperations.join(UdpOperations.java:65)
at server.Application.lambda$null$2(Application.java:45)
解决方法
UdpInbound.join()
返回Mono<Void>
,加入该组后发布者将完成。
之后,您可以开始使用UdpInbound.receive()
接收包裹了。
in.join(...)
.thenMany(in.receive()
...)
...
有关示例,请参见here。
您可以使用handler
方法来处理I / O(如链接中所示),也可以仅进行绑定,当收到Connection
时,可以调用{{1 }},然后您就可以加入并开始接收包裹。
关于解码器/编码器,我们建议您使用Reactor Core中的运算符来转换传入的包。如果您无法使用该方法实现此目的,则可以始终使用例如Connection.inbound()