如何在 Akka 中将刻度源与内部源流相结合

问题描述

我希望创建一个调度程序,它为每个给定的时间段打勾并从多个表中传输数据。如果下游接收器变慢,我希望自动收报机也变慢。有了它,我可以提供背压。

所以总的来说,我结合了一个解决方案,但我结束了 Source[Source[_]],因为外部 Source 是独立于内部 Source 性能代码

这是一个简化版本的代码,我创建了 Slick 源来从数据库获取数据;

val source = Source.tick(initialDelay.second,1.second,"ss")
    .map(_ => getSegments)
    .mapConcat { case segments =>
      segments.map { s =>

        val newUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_data".as[Int]).map(id => (id,s.newData))
       
        val allUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_someotherdata".as[Int]).map(id => (id,s.wholeSegment))
        
        Source.combine(newUsers,allUsers)(Merge(_))
      }
    }

最终我想从多个表中流式传输数据。

谢谢。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...