如何将 akka 演员与衰落结合起来

问题描述

我有一个包含 akka、akka http 和 akka 流的应用程序,所以它不是休息或微服务是一个短暂的应用程序;它是来自 rest api 的数据提取器,应用程序写入 postgres 数据库,它可以工作,只是我有疑问,如果以这种方式集成下降和 akka 是否正确。

代码

object App extends CommandioApp(name = "app",header = "extractor",version = "0.1.0") {

 
  val config = Config( token,jdbcDriver = jdbcDriver,jdbcURL = jdbcURL,jdbcUser = jdbcUser,jdbcPassword = jdbcPassword,jdbcSchema = jdbcSchema)

  override def main: Opts[IO[ExitCode]] =
    Opts.subcommand(name="extract-teams",help="Extract Teams from API"){

      val tableName = Opts.option[String]("recursive",short="r",help="Recursive Extraction")
      val outputFile = Opts.option[Path]("output",short="o",help="Output file").withDefault(Paths.get("output.csv"))

      ( tableName,outputFile).mapN{( table,output) =>
        println(table)
        println(output)

        ClickUpExtractions.extractTeams()

        IO(ExitCode.Success)
      }
    }

  object ClickUpExtractions extends ActorGlobalImplicits {

    def extractTeams(): Unit ={

      import com.arkondata.extractors.ClickUpTeamsActions._
      val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config),name = "EngineActor")
      val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config),name = "Writer")

      extractorFetcher ! Fetch(extractorWriter)

    }

  }

}

接收代码

override def receive: Receive = {
    case WritePG(teamsData) =>
      println("-------writer")
      println(teamsData)
      val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id,data.name,data.color,data.avatar))
      println(teamsPG)

      println("insert-----------------------")
      //// manejar exception aqui  importante!!!
      val upsertStatement = "insert into Teams (id,name,color,avatar) values ( ?,?,?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
      BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafetoFuture()//.unsafeRunSync()
      //// manejar exception aqui  importante!!!
      println("insert doobie")
      println("end------------------------")

      self ! "mensaje salida myself!!!!"
    case msg:String =>
      println(msg)
      context.system.terminate()
      System.exit(1) // this is the last step in the akka flow
  }

注意:

System.exit(1) // this is the last step in the akka flow

我还使用其他库,如 doobie、decimal、cats: https://ben.kirw.in/decline/effect.html 。 应用程序可以工作,只是我需要知道它是否正确,或者 akka 和 this 之间是否存在更好的集成:IO(ExitCode.Success)

有什么办法可以从 akka 得到一个 tell 响应并验证它,比如:

val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res)) 
   IO(ExitCode.Success)
else 

   IO(ExitCode.Error)

解决方法

一些建议:

  1. 您可以将 Akka 与其他基于 cat 的框架一起使用,但是您的应用程序中将有两种不同的术语:Fibers、ActorSystems、Futures 与 IO 效果……因此,如果您想使用 Cats,请选择Monix 或 FS2,它们直接与 Cats 效果连接。相反,如果您必须使用 Akka,请使用 Akka Streams,它有一个很棒的套件,并且只处理 Akka 调度程序和 Scala Futures。

  2. 使用 Akka 类型。自从 Akka Typed 投入生产以来,Akka 已经有了很大的改进。您正在接收函数中创建异步计算,这很容易出错。

,

Akka 生态系统依赖于 Future。 Cats Effect 依赖于一些 F[_] - 内置是 IO 但也有 Monix 的 Task 和 ZIO。

Translation Future IO 可以这样完成:

io.unsafeToFuture

IO.fromFuture(IO(start future here)) // Future start is side-effect on its own

Monix 类似:

task.runToFuture // requires Scheduler

Task.deferFuture(future thunk) // or
Task.deferFutureAction(implicit scheduler => future)

如果您使用的是无标签,则是:

Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]

val runToFuture: F ~> Future // AFAIR there is no popular type class for it
runToFuture(io)

如果您正在使用流,那么有 streamz 库可以处理 Akka Streams FS2 翻译。 Akka Streams、Monix 的 Observable 实现和 FS2 提供了 Reactive Streams 的实现,因此您可以使用 RS 接口让这些库相互通信,因此像 streamz 这样的库只是一个方便的工具。

AFAIK 没有其他集成,也不需要它们。

但是,我建议您了解一些基础知识,Akka 和 Future 是如何工作的(热切、记忆、执行上下文等)与 Cats Effect 的工作方式(通常是懒惰、没有记忆、除了 Clock 或 { 的初始化之外没有 EC {1}} 在 CE IO 中,CE IO 和 Monix 之间的细微差别(例如调度程序)。如果你不知道它们之间的区别,你很容易造成一些伤害。