为 websocket 提升堆栈协程,如何发布一个函数并从另一个线程恢复执行 用asio异步操作实现select使用异步事件实现选择使用异步锁存器实现选择

问题描述

int main()
{
    tcp::socket socket(iocp);
    acceptor.async_accept(socket,yield[ec]);
    if (ec)
        fail(ec,"accept");
    else
        boost::asio::spawn(acceptor.get_executor(),std::bind(&do_session,websocket::stream<beast::tcp_stream>(std::move(socket)),std::placeholders::_1));

    ... iocp run
}

void do_session(websocket::stream<beast::tcp_stream>& ws,net::yield_context yield)
{
    while(ws.is_open())
    {
        ws.async_read(buffer,yield[ec]);
        ... process the buffer
        ... execute posted callbacks
    }
}

void another_thread()
{
    while(isAppNotExit)
    {
        post_to_specified_coroutine(ws,[]() {   ... do in courutine same thread });
    }
}

我需要在任何线程中发布一个函数来让指定的协程运行该函数,即上面“执行发布的回调”的代码部分。但是这个任务下发后,协程可能会处于 async_read 或 async_write 状态。是否可以发布一个类似数据的事件并让 async_read 或 async_write 函数立即返回?

解决方法

我想问题的本质是这样的:在 2 个通道上使用 select:一个容量为 1 的通道和一个(可能)无限容量的通道。

用asio异步操作实现select

编写一个 asio 异步操作来等待多个(两个)事物。
(asio 异步操作模板:c++ - How to wait for a function to return with Boost:::Asio? - Stack Overflow)。

受互斥锁保护的状态:

  • a std::optional<read_result>
  • a std::vector<functor>
  • a bool(是否有正在进行的 async_read)
  • a std::optional<completion handler>

您的async_wait_for_2_things

  1. 从完成令牌 (yield[ec]) 中获取完成处理程序(一个可调用的,可以恢复您的协程);
  2. 锁定互斥锁(使用guard);
  3. 如果有来自another_thread的pending functor,取出来,发布完成处理程序;
  4. else 如果有待处理的 read_result,将其取出,发布完成处理程序;
  5. 否则,如果有一个正在进行的 async_read(布尔值为真),则存储完成处理程序(如果已经存储了一个完成处理程序,则抛出“不可能发生”);
  6. else(没有挂起的函子,没有挂起的 read_result,async_read 还没有启动),存储完成处理程序(如果已经存储了一个完成处理程序,抛出“不可能发生”),将 bool 设置为 true(如果bool 已经为真,抛出“不能发生”),调用 async_read;
  7. 解锁互斥锁;

async_read 的回调:

  1. 锁定互斥锁(使用guard);
  2. 将 bool 设置为 false(如果 bool 已经为 false,则抛出“不可能发生”);
  3. 如果有完成处理程序,将其取出,发布;
  4. 否则,存储read_result(如果已经存储了read_result,则抛出“不可能发生”);
  5. 解锁互斥锁;

another_thread 发布函子的代码:

  1. 锁定互斥锁(使用guard);
  2. 如果有完成处理程序,将其取出,发布;
  3. 否则,存储函子;
  4. 解锁互斥锁;

使用异步事件实现选择

  1. async_read(使用回调重载)的 lambda 完成处理程序:存储结果,通知 asynchronous_event;
  2. another_thread:存储函子,通知异步事件;
  3. do_session:异步等待 asynchronous_event,加载结果或函子;
  4. asynchronous_event 的数据位于受互斥锁保护的 std::pair<std::optional<read_result>,std::vector<functor>> 中;

使用定时器实现异步事件:c++ - Why does Boost.Asio not support an event-based interface? - Stack Overflow

这不适用,因为“异步事件”不是“异步条件变量”,它不能:

  • 在异步等待中以原子方式
  • 释放互斥锁

(可能的顺序:do_session 释放互斥锁,然后发布函子,然后通知事件(cancel_one),然后 do_session 等待事件(timer_.async_wait(yield[ec]);)并永远阻塞)


使用异步锁存器实现选择

  1. async_read(使用回调重载)的lambda handler:①存储结果并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥wake up);
  2. another_thread:①存储函子并重置asynchronous_latch_producer,②通知asynchronous_latch_consumer,等待asynchronous_latch_producer(,⑥wake up);
  3. do_session:等待asynchronous_latch_consumer(,③wake up),④加载结果或函子并重置asynchronous_latch_consumer,⑤通知asynchronous_latch_producer;
  4. asynchronous_latch_consumer 和 asynchronous_latch_producer 的数据在一个 std::pair<std::optional<read_result>,std::vector<functor>> 中;

使用定时器实现异步锁存器:c++ - Cancelling boost asio deadline timer safely - Stack Overflow。修改异步事件实现以获取异步闩锁:在构造函数和 reset.expires_at(Timer::clock_type::time_point::max()) 中;在notify_all_one_shot.expires_at(Timer::clock_type::time_point::min())

这不适用,因为其中一个生产者可能会永远阻塞。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...