如何使用 redisson 和 ZIO 监听 redis 列表事件

问题描述

对于 Radisson,只需接收事件即可将新项目添加到列表中。为此,您需要执行以下操作:

object Test extends App {
  val redisson = Redisson.create()
  val events = redisson.getQueue[String]("minio_events",new StringCodec())
  events.addListener(new ListAddListener() {
    override def onListAdd(o: String): Unit = println(o)
  })
}

当它需要被包裹在 ZIO 中时,困难就开始了。如何在 ZIO 或 ZStream 中包装此事件以启动事件处理链?

解决方法

看起来 Redisson 支持将 def message_return(messages=None): default_messages = [" "," "," "] if messages is not None: messages = messages + default_messages[len(messages):] else: messages = default_messages.copy() return messages print(message_return(["test"])) # ['test',' ',' '] 转换为反应流客户端,该客户端有一个 zio-interop。但是如果你只想直接使用 java 接口,我认为你可以做这样的事情(注意我没有实际测试过):

RedissonClient
,

ZStream 是基于拉取的,这意味着您必须以某种方式从 minio_events 拉取数据

val redisson = Redisson.create()    
val bqueue : RQueue[String] = redisson.getQueue("minio_events",new StringCodec())
    
val pollQueue = 
  ZIO
    .effect(Option(bqueue.poll())) // RQueue.poll returns null if the queue is empty
    .someOrFail(NoElementsOnStream)

这会创建一个 ZIO[Any,Throwable,String] 代表您的轮询操作,现在可以通过调用 ZStream 方法将其转换为 ZStream.fromEffect

ZStream
  .fromEffect(pollQueue)
  .foreach(s => putStrLn(s))
  .exitCode

如果您将此代码放在 zio.App 主函数中,您会看到它只运行一次。所以我们需要让它永远运行并重试直到找到另一个元素

ZStream
  .fromEffect(pollQueue)
  .retry(Schedule.spaced(1.second))
  .forever
  .foreach(s => putStrLn(s))  
  .exitCode