使用Fiber的带队列的Desiginig异步分布式计算例如Kafka

问题描述

我正在尝试解决与通信Master --> Queue --> Workers相关的设计问题。 MasterQueue中提交计算请求,其中一些Workers将其处理。我想使此操作join可行且cancel可行,使其与Fiber所提供的功能完全一样。

问题在于文档中指定的Fiber的定义如下:

Fiber代表Concurrent数据类型的(纯)结果(例如 IO

可以同时启动,可以加入也可以 已取消。

据我了解,Fiber应该用于并发(非分布式)计算。但是在我的情况下,cancel意味着向工作人员发送取消消息,而join意味着正在等待来自工作人员的消息,这意味着完成。这是它的样子

private def awaitFinishMessage[F[_]](taskId: Int): F[Unit] = //...
private def sendCancellationMessage(taskId: Int): F[Unit] = //...

sealed trait ComputationMeat
private def startComputation(meta: ComputationMeat): F[Int] = //...

def start[F[_]](meta: ComputationMeat): F[Fiber[F,Unit]] = for {
    taskId <- startComputation(meta)
} yield Fiber.apply(awaitFinishMessage(taskId),sendCancellationMessage(taskId))

问题:Fiber的一个唯一/不常见/意外的用例,用于表示可调用/可联接的 分布式 计算队列(例如Kafka或其他)作为中间件?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...