问题描述
对于 Radisson,只需接收事件即可将新项目添加到列表中。为此,您需要执行以下操作:
object Test extends App {
val redisson = Redisson.create()
val events = redisson.getQueue[String]("minio_events",new StringCodec())
events.addListener(new ListAddListener() {
override def onListAdd(o: String): Unit = println(o)
})
}
当它需要被包裹在 ZIO 中时,困难就开始了。如何在 ZIO 或 ZStream 中包装此事件以启动事件处理链?
解决方法
看起来 Redisson 支持将 def message_return(messages=None):
default_messages = [" "," "," "]
if messages is not None:
messages = messages + default_messages[len(messages):]
else:
messages = default_messages.copy()
return messages
print(message_return(["test"])) # ['test',' ',' ']
转换为反应流客户端,该客户端有一个 zio-interop。但是如果你只想直接使用 java 接口,我认为你可以做这样的事情(注意我没有实际测试过):
RedissonClient
,
ZStream 是基于拉取的,这意味着您必须以某种方式从 minio_events
拉取数据
val redisson = Redisson.create()
val bqueue : RQueue[String] = redisson.getQueue("minio_events",new StringCodec())
val pollQueue =
ZIO
.effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
.someOrFail(NoElementsOnStream)
这会创建一个 ZIO[Any,Throwable,String]
代表您的轮询操作,现在可以通过调用 ZStream
方法将其转换为 ZStream.fromEffect
ZStream
.fromEffect(pollQueue)
.foreach(s => putStrLn(s))
.exitCode
如果您将此代码放在 zio.App
主函数中,您会看到它只运行一次。所以我们需要让它永远运行并重试直到找到另一个元素
ZStream
.fromEffect(pollQueue)
.retry(Schedule.spaced(1.second))
.forever
.foreach(s => putStrLn(s))
.exitCode