问题描述
我希望创建一个调度程序,它为每个给定的时间段打勾并从多个表中传输数据。如果下游接收器变慢,我希望自动收报机也变慢。有了它,我可以提供背压。
所以总的来说,我结合了一个解决方案,但我结束了 Source[Source[_]]
,因为外部 Source 是独立于内部 Source 性能的代码。
这是一个简化版本的代码,我创建了 Slick 源来从数据库中获取数据;
val source = Source.tick(initialDelay.second,1.second,"ss")
.map(_ => getSegments)
.mapConcat { case segments =>
segments.map { s =>
val newUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_data".as[Int]).map(id => (id,s.newData))
val allUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_someotherdata".as[Int]).map(id => (id,s.wholeSegment))
Source.combine(newUsers,allUsers)(Merge(_))
}
}
最终我想从多个表中流式传输数据。
谢谢。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)