ZIO 和多个回调

问题描述

我刚开始使用 ZIO。我目前正在 Scala 中编写一个加密交易机器人,同时我正在尝试学习 ZIO。现在我正在打开一个 websocket,这个 websocket 会提供多个回调,直到它关闭为止,我正在努力将其集成到我的代码中。我当前的代码

object Main extends zio.App with Logging {
   def run(args: List[String]): URIO[Any with Console,ExitCode] = Configuration.getConfiguration.fold(onError,start).exitCode

   private val interval: CandlestickInterval = CandlestickInterval.ONE_MINUTE

   private def onError(exception: ConfigurationException): ZIO[Any,Throwable,Unit]  = {
     logger.info("Could not initialize Traderbot!")
     logger.error(exception.getMessage)
     IO.succeed()
   }

   private final def start(configuration: Configuration): ZIO[Any,Unit] = {
      for {
        binanceClient <- IO.succeed(BinanceapiclientFactory.newInstance(configuration.apiKey,configuration.secret))
        webSocketClient <- IO.succeed(binanceClient.newWebSocketClient())
        candlesticks <- Task.effectAsync[CandlestickEvent] {
          callback =>
            webSocketClient.onCandlestickEvent(
            "adaeur",interval,d => callback(IO.succeed(d))
          )
        })
        // Todo Calculate RSI from candlesticks.
   } yield candlesticks
 }
}

我希望继续接收烛台事件并保持功能正常。我看到了一些关于 Zio Streams 的内容,但我找不到处理重复回调且易于理解的示例。现在我不能用我的烛台代码来理解。

感谢您的时间!

解决方法

遗憾的是,使用 ZIO 时,effectAsync 无法处理多个回调,因为数据类型基于单个成功或失败值。

您可以使用 ZStream 代替,但它具有类似形状的运算符,可以多次调用:

private final def start(configuration: Configuration): ZStream[Any,Throwable,Unit] = {
  val candlesticks = ZStream.unwrap(
    IO.effectTotal {
      val client = BinanceApiClientFactory
        .newInstance(configuration.apiKey,configuration.secret)
        .newWebSocketClient()

      // This variant accepts a return value in the `Left` which 
      // is called when during shutdown to make sure that the websocket is 
      // cleaned up
      ZStream.effectAsyncInterrupt { cb => 
        val closeable = webSocketClient.onCancelstickEvent(
          "adaeur",interval,d => cb(IO.succeed(d)
        ) 

        Left(UIO(closeable.close()))
      }
  )

  for {
    candlestick <- candlesticks
    // TODO Calculate RSI from candlesticks.
  } yield ()
}