问题描述
我想使用kqueue来监视文件的更改。我可以看到如何以线程方式使用select.kqueue()。
我正在寻找一种将其与asyncio结合使用的方法。我可能错过了一些真正明显的东西。我知道python使用kqueue for asyncio on macos。对于任何仅在使用kqueue选择器时有效的解决方案,我感到很高兴。
到目前为止,我唯一能看到的方法是创建一个线程,以从另一个线程连续kqueue.control()
开始,然后使用asyncio.loop.call_soon_threadsafe()
注入事件。我觉得应该有更好的方法。
解决方法
您可以使用loop.add_reader()将来自kqueue对象的FD作为读取器添加到控制循环中。然后,控制循环将通知您事件已准备好收集。
执行此操作的两个功能对于熟悉kqueue的人可能很奇怪:
- select.kqueue.control是一种一次性方法,该方法首先更改监视器,然后等待新事件到达。因为我们从不希望它阻塞,所以必须将这两个操作分为一个非阻塞调用以修改监视器,然后再分为第二个非阻塞调用以收集生成的事件。
- 因为我们永远不想阻塞,所以永远无法使用超时。可以使用
asyncio.wait_for()
重新实现
有更有效的方法来编写此代码,但是这里有一个示例,说明如何用异步方法(此处称为select.kqueue.control
)完全替换kqueue_control
:
async def kqueue_control(kqueue: select.kqueue,changes: Optional[Iterable[select.kevent]],max_events: int,timeout: Optional[int]):
def receive_result():
try:
# Events are ready to collect; fetch them but do not block
results = kqueue.control(None,max_events,0)
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(results)
finally:
loop.remove_reader(kqueue.fileno())
# If this call is non-blocking then just execute it
if timeout == 0 or max_events == 0:
return kqueue.control(changes,0)
# Apply the changes,but DON'T wait for events
kqueue.control(changes,0)
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(kqueue.fileno(),receive_result)
if timeout is None:
return await future
else:
return await asyncio.wait_for(future,timeout)