ktor websocket flow api如何工作?

问题描述

我正在使用ktor通过websocket进行服务器端开发。

文档向我们展示了使用传入渠道的示例:

for (frame in incoming.mapNotNull { it as? Frame.Text }) {
    // some
}

但是mapNotNull标记为不赞成Flow。我应该如何使用该API,可能会有什么问题?例如,Flow是冷流。这意味着将在每个collect调用生产函数。它如何在websocket的上下文中工作。它会在第二次collect通话时重新打开,还是可能在下一个collect之后发送一次旧邮件?如何收集N邮件,然后停止收集,然后再次收集?

预先感谢:)

解决方法

我应该如何使用此API?可能有什么问题?

我正在使用的以及在文档中某个示例中看到的是在consumeAsFlow()上调用的ReceiveChannel方法。这是整个代码段:

webSocket("/websocket") { //this: DefaultWebSocketServerSession
    incoming
        .consumeAsFlow()
        .map { receive(it) }
        .collect()
}

这种方法还没有发现重大问题。您应该意识到的一件事(但对于非流方法也是如此)是,如果您将自己扔进流中,那么它将破坏WebSocket连接,这通常不是您想要执行的操作。可能值得考虑将整个内容包装在try-catch中。

会在第二次催收呼叫中重新打开它,还是可能在下一次催收后将旧邮件发送一次?

您甚至在开始使用流中的消息之前就打开了websocket。您可以看到在webSocket() {}内部,您处于DefaultWebSocketServerSession的上下文中。这是您的连接管理。在流内部,您只是在消息到达时(建立连接之后)一个接一个地接收消息。如果连接断开,那么您就没钱了。必须先重新建立它,然后才能处理您的消息。该建立位是通过Route.webSocket()方法完成的。我建议您看看它的Javadoc。

如果您希望在关闭连接后进行一些清理,则可以添加一个finally块,如下所示:

webSocket("/chat") {
    try {
        incoming
            .consumeAsFlow()
            .map { receive(it,client) }
            .collect()
    } finally {
        // cleanup
    }
}

简而言之:collect每条收到的消息被调用一次。如果没有连接(或连接断开),则不会调用collect

如何收集N条消息,然后停止收集,然后再次收集?

这是什么用例?我认为您不应该随心所欲地这样做。您当然可以从流中take(n)个项目,但是您将无法再从中获取更多内容。