问题描述
我将视频作为一系列图像(等于 zmq 消息)发送,但有时,也许当网络速度较慢时,它们的接收速度比发送速度慢,并且出现越来越长的延迟,似乎高达约一分钟的视频或数百张图片或兆字节的数据。它通常最终会自行清除,订阅者接收消息的速度比发布者发送的速度快。
相反,如果订阅者recv
处理它们的速度太慢,我希望它以应该的方式丢弃错过的消息。我希望 zmq.CONFLATE
=1 会这样做,但事实并非如此。那么如何?我怀疑它们在发布者处被缓冲,它不应该有任何 zmq 缓冲区,或者以某种方式在网络堆栈中。
简化的服务器代码
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream,'jpeg',use_video_port=True):
stream.truncate()
stream.seek(0)
socket.send(stream.read())
stream.seek(0)
简化的客户端代码
# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE,1)
self.video_socket.setsockopt(zmq.SUBSCRIBE,b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")
def get_image(self):
# Receive the latest image
poll_result = self.video_socket.poll(timeout=0)
if poll_result == zmq.POLLIN:
return self.video_socket.recv()
else:
return None
发布者使用 RaspBerry Pi,订阅者使用 Windows。
解决方法
我不确定您使用的是哪个版本的 python zmq,但基于您需要的底层 c++ libzmq:
- 在服务器套接字上设置 ZMQ_SNDHWM 套接字选项
- 在客户端套接字上设置 ZMQ_RCVHWM 套接字选项。
在发布/订阅的情况下,这些选项限制每个已完成连接排队的消息数量。如果队列增长大于 HWM(高水位线),消息将被丢弃。
同时关闭合并,因为这会干扰这些选项。
,还要在服务器上设置 zmq.CONFLATE
=1 以仅保留发送队列中的最新消息。
绑定服务端socket之前
socket.setsockopt(zmq.CONFLATE,1)
出于某种原因,我错误地认为 PUB 套接字没有发送队列,但它有。