如何在异步中使用kqueue进行文件监视?

问题描述

我想使用kqueue来监视文件的更改。我可以看到如何以线程方式使用select.kqueue()。

我正在寻找一种将其与asyncio结合使用的方法。我可能错过了一些真正明显的东西。我知道python使用kqueue for asyncio on macos。对于任何仅在使用kqueue选择器时有效的解决方案,我感到很高兴。

到目前为止,我唯一能看到的方法是创建一个线程,以从另一个线程连续kqueue.control()开始,然后使用asyncio.loop.call_soon_threadsafe()注入事件。我觉得应该有更好的方法

解决方法

您可以使用loop.add_reader()将来自kqueue对象的FD作为读取器添加到控制循环中。然后,控制循环将通知您事件已准备好收集。

执行此操作的两个功能对于熟悉kqueue的人可能很奇怪:

  • select.kqueue.control是一种一次性方法,该方法首先更改监视器,然后等待新事件到达。因为我们从不希望它阻塞,所以必须将这两个操作分为一个非阻塞调用以修改监视器,然后再分为第二个非阻塞调用以收集生成的事件。
  • 因为我们永远不想阻塞,所以永远无法使用超时。可以使用asyncio.wait_for()
  • 重新实现

有更有效的方法来编写此代码,但是这里有一个示例,说明如何用异步方法(此处称为select.kqueue.control)完全替换kqueue_control

async def kqueue_control(kqueue: select.kqueue,changes: Optional[Iterable[select.kevent]],max_events: int,timeout: Optional[int]):

    def receive_result():
        try:
            # Events are ready to collect; fetch them but do not block
            results = kqueue.control(None,max_events,0)
        except Exception as ex:
            future.set_exception(ex)
        else:
            future.set_result(results)
        finally:
            loop.remove_reader(kqueue.fileno())
            
    # If this call is non-blocking then just execute it
    if timeout == 0 or max_events == 0:
        return kqueue.control(changes,0)
    
    # Apply the changes,but DON'T wait for events
    kqueue.control(changes,0)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    loop.add_reader(kqueue.fileno(),receive_result)
    if timeout is None:
        return await future
    else:
        return await asyncio.wait_for(future,timeout)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...