参考更新和使用 Cats 效应的光纤触发器

问题描述

问题: 我正在尝试解决一个问题,我需要每 x 分钟安排一次,我需要更新缓存并且可以并发获取。>

尝试过的解决方案:

  1. 将 TrieMap 和 ScheduledThreadPool Executor 与 Cats Effects 结合使用:

我实际上是从使用 TrieMap 开始的,因为它提供了线程安全性并使用了调度线程池来调度更新

import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode,IO,IOApp}

import java.util.concurrent.{Executors,ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt,FiniteDuration}
import scala.util.Random

object ExploreTrieMap extends IOApp {
  def callForEvery[A](f: => Unit,d: FiniteDuration)
                     (implicit sc: ScheduledExecutorService): IO[Unit] = {
    IO.cancelable {
      cb =>
        val r = new Runnable {
          override def run(): Unit = cb(Right(f))
        }
        val scFut = sc.scheduleAtFixedRate(r,d.length,d.unit)
        IO(scFut.cancel(false)).void
    }
  }

  val map = TrieMap.empty[String,String]
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
    for {
      _ <- callForEvery(println(map.get("token")),1 second)
      _ <- callForEvery(println(map.put("token",Random.nextString(10))),3 second)
    } yield ExitCode.Success
  }
}

  1. 使用 Ref 和 Cats Effect 纤维:

然后创造了一个纯粹的猫效应解决方案。

下面的代码会导致 StackOverflow 错误吗?

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift,ExitCode,Fiber,IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt,FiniteDuration}
import scala.util.Random

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      s <- scheduleAndPopulate(ref,1 minute)
      r <- keepPollingUsingFiber(ref)
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO,String],duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO,Unit]] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      rS <- scheduleAndPopulate(r,duration)(cs)
      _ <- rS.join
    } yield ()).start(cs)
  }


  def keepPollingUsingFiber(r: Ref[IO,String])(implicit cs: ContextShift[IO]): IO[Fiber[IO,Unit]] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingFiber(r)(cs)
      _ <- w.join
    } yield ()).start(cs)
  }
}

我正在尝试更新一个 Ref,并将该 Ref 用作正在由另一个光纤更新的并发缓存。我正在使用递归触发光纤创建。我知道纤维可用于堆栈安全操作。在这种情况下,我将加入创建的旧光纤。所以想了解以下代码是否安全。

更新(下面提供的答案的解决方案)

第三种解决方案:基于答案之一的输入。与其为每个递归调用分叉,不如将其分叉给调用方。

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift,FiniteDuration}
import scala.util.Random

object ExploreCatFiberWithIO extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      s <- scheduleAndPopulateWithIO(ref,1 second).start
      r <- keepPollingUsingIO(ref).start
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulateWithIO(r: Ref[IO,duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
    for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulateWithIO(r,duration)(cs)
    } yield ()
  }

  def keepPollingUsingIO(r: Ref[IO,String])(implicit cs: ContextShift[IO]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingIO(r)(cs)
    } yield ())
  }
}

很想知道上述方法的优缺点。

解决方法

对于第二种方法,您可以通过不在 FiberscheduleAndPopulate 中分叉 keepPollingUsingFiber 来使其更简单。相反,保留递归调用,并将它们分叉到调用者中。 IO 是堆栈安全的,因此递归调用不会炸毁堆栈。

您可以使用 start 对每个进行分叉,但 parTupled 可能更简单。它是 parMapN 的一种变体,用于分叉每个效果并收集它们的结果。

(此外,在您的代码中,您不需要显式传递隐式值,例如 cs,编译器会为您推断它们。)

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      _ <- (scheduleAndPopulate(ref,1 minute),keepPollingUsingFiber(ref)).parTupled
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO,String],duration: FiniteDuration): IO[Unit] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulate(r,duration)
    } yield ()
  }

  def keepPollingUsingFiber(r: Ref[IO,String]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      _ <- keepPollingUsingFiber(r)
    } yield ()
  }
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...