问题描述
我有一个包含 akka、akka http 和 akka 流的应用程序,所以它不是休息或微服务是一个短暂的应用程序;它是来自 rest api 的数据提取器,应用程序写入 postgres 数据库,它可以工作,只是我有疑问,如果以这种方式集成下降和 akka 是否正确。
代码:
object App extends CommandioApp(name = "app",header = "extractor",version = "0.1.0") {
val config = Config( token,jdbcDriver = jdbcDriver,jdbcURL = jdbcURL,jdbcUser = jdbcUser,jdbcPassword = jdbcPassword,jdbcSchema = jdbcSchema)
override def main: Opts[IO[ExitCode]] =
Opts.subcommand(name="extract-teams",help="Extract Teams from API"){
val tableName = Opts.option[String]("recursive",short="r",help="Recursive Extraction")
val outputFile = Opts.option[Path]("output",short="o",help="Output file").withDefault(Paths.get("output.csv"))
( tableName,outputFile).mapN{( table,output) =>
println(table)
println(output)
ClickUpExtractions.extractTeams()
IO(ExitCode.Success)
}
}
object ClickUpExtractions extends ActorGlobalImplicits {
def extractTeams(): Unit ={
import com.arkondata.extractors.ClickUpTeamsActions._
val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config),name = "EngineActor")
val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config),name = "Writer")
extractorFetcher ! Fetch(extractorWriter)
}
}
}
接收代码:
override def receive: Receive = {
case WritePG(teamsData) =>
println("-------writer")
println(teamsData)
val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id,data.name,data.color,data.avatar))
println(teamsPG)
println("insert-----------------------")
//// manejar exception aqui importante!!!
val upsertStatement = "insert into Teams (id,name,color,avatar) values ( ?,?,?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafetoFuture()//.unsafeRunSync()
//// manejar exception aqui importante!!!
println("insert doobie")
println("end------------------------")
self ! "mensaje salida myself!!!!"
case msg:String =>
println(msg)
context.system.terminate()
System.exit(1) // this is the last step in the akka flow
}
注意:
System.exit(1) // this is the last step in the akka flow
我还使用其他库,如 doobie、decimal、cats: https://ben.kirw.in/decline/effect.html 。
应用程序可以工作,只是我需要知道它是否正确,或者 akka 和 this 之间是否存在更好的集成:IO(ExitCode.Success)
有什么办法可以从 akka 得到一个 tell 响应并验证它,比如:
val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res))
IO(ExitCode.Success)
else
IO(ExitCode.Error)
解决方法
一些建议:
-
您可以将 Akka 与其他基于 cat 的框架一起使用,但是您的应用程序中将有两种不同的术语:Fibers、ActorSystems、Futures 与 IO 效果……因此,如果您想使用 Cats,请选择Monix 或 FS2,它们直接与 Cats 效果连接。相反,如果您必须使用 Akka,请使用 Akka Streams,它有一个很棒的套件,并且只处理 Akka 调度程序和 Scala Futures。
-
使用 Akka 类型。自从 Akka Typed 投入生产以来,Akka 已经有了很大的改进。您正在接收函数中创建异步计算,这很容易出错。
Akka 生态系统依赖于 Future
。 Cats Effect 依赖于一些 F[_]
- 内置是 IO
但也有 Monix 的 Task
和 ZIO。
Translation Future IO 可以这样完成:
io.unsafeToFuture
IO.fromFuture(IO(start future here)) // Future start is side-effect on its own
Monix 类似:
task.runToFuture // requires Scheduler
Task.deferFuture(future thunk) // or
Task.deferFutureAction(implicit scheduler => future)
如果您使用的是无标签,则是:
Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]
val runToFuture: F ~> Future // AFAIR there is no popular type class for it
runToFuture(io)
如果您正在使用流,那么有 streamz 库可以处理 Akka Streams FS2 翻译。 Akka Streams、Monix 的 Observable 实现和 FS2 提供了 Reactive Streams 的实现,因此您可以使用 RS 接口让这些库相互通信,因此像 streamz 这样的库只是一个方便的工具。
AFAIK 没有其他集成,也不需要它们。
但是,我建议您了解一些基础知识,Akka 和 Future 是如何工作的(热切、记忆、执行上下文等)与 Cats Effect 的工作方式(通常是懒惰、没有记忆、除了 Clock
或 { 的初始化之外没有 EC {1}} 在 CE IO 中,CE IO 和 Monix 之间的细微差别(例如调度程序)。如果你不知道它们之间的区别,你很容易造成一些伤害。