问题描述
我正在寻找一种方法来懒惰地组合两个效果,而无需先在 Zio 中执行它们的结果。我的程序采用以下形式:
red blue purple
在当前的公式中,/**
* Returns a reference to an effectful singleton cron scheduler backed by akka
* See https://github.com/philcali/cronish for more info on the API
*/
def scheduled: UManaged[Ref[Scheduled]] = ???
def schedule[R,E,A](e: => ZIO[R,A],crondef: String) =
(for {
resource <- scheduled
task <- ZManaged.fromEffect(e) // I need to lift the underlying effect here,not access its result
} yield resource.modify(schedule => schedule(job(task),crondef.cron) -> schedule)).flattenM
def scheduleEffect[A](e: => A,description: String = "")(crondef: String) =
(for {
resource <- scheduled
} yield resource.modify(schedule => schedule(job(e),crondef.cron) -> schedule)).flattenM
// Program which schedules cron jobs to increment/decrement x and y,respectively
def run(args: List[String]): URIO[ZEnv,ExitCode] = {
var x = 0
var y = 100
(for {
_ <- Scheduler.schedule(UIO({ x += 1; println(x) }),"every second")
_ <- Scheduler.scheduleEffect({ y -= 1; println(y) },"every second")
} yield ())
.provideCustomLayer(???)
.as(ExitCode.success)
.useForever
}
的递减每秒运行一次,直到程序终止,而 y
的递增仅运行一次。我知道 ZIO 提供了一个 x
实用程序,但出于遗留兼容性原因,我必须坚持使用 Cronish 库使用的有效单例。基本上,Schedule
采用 job
类型的传递引用效果并将其挂起在 A
中,以便根据 {{1} 定义的时间表在 CronTask
单例中执行}}。
我想知道是否可以自己组合 效果,而不是在 ZIO 的上下文中组合它们的结果?我基本上已经将遗留的 cron 调度程序包装在 ZIO 数据类型中以正确管理并发性,但我仍然需要代码中其他 ZIO 签名方法的暂停效果可供我传递给调度程序。
解决方法
我最终通过通读 ZIO.effectAsyncM
的源代码找到了解决方案,特别指出它对 ZIO.runtime[R]
的引用:
/**
* Imports an asynchronous effect into a pure `ZIO` value. This formulation is
* necessary when the effect is itself expressed in terms of `ZIO`.
*/
def effectAsyncM[R,E,A](
register: (ZIO[R,A] => Unit) => ZIO[R,Any]
): ZIO[R,A] =
for {
p <- Promise.make[E,A]
r <- ZIO.runtime[R] // This line right here!
a <- ZIO.uninterruptibleMask { restore =>
val f = register(k => r.unsafeRunAsync_(k.to(p)))
restore(f.catchAllCause(p.halt)).fork *> restore(p.await)
}
} yield a
虽然我无法在 ZIO 文档中找到对这种方法的直接引用,但 Scaladoc 已经足够清楚了:
Returns an effect that accesses the runtime,which can be used to (unsafely) execute tasks. This is useful for integration with legacy code that must call back into ZIO code
。
有了这个,我的新实现效果很好,如下所示:
def schedule[R,A](e: => ZIO[R,A],crondef: String) =
(for {
resource <- scheduled
runtime <- ZManaged.fromEffect(ZIO.runtime[R])
} yield resource.modify({
schedule => schedule(job(runtime.unsafeRun(e)),crondef.cron) -> schedule
})).flattenM