反应性akka流:如何延迟图形关闭直到源变干?

问题描述

我正在尝试从kafka经纪人向akka流图提供数据(例如10个字符串)

这是单元测试中的图表:

val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value,"porteur-id")

val drainingControl =
  Consumer
    .plainSource(consumer.kafkaConsumerSettings,Subscriptions.topics("transaction"))
    .toMat(Sink.seq)(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete,Duration.Inf).size should be > 0

单元测试失败(大小等于0,预期为10)

但是,如果在我的drandAndShutdown()调用之前插入一个Thread.sleep(5000),它将通过。

给我的印象是,在不睡眠的情况下,该图形在run()调用之后直接关闭,并且没有时间甚至处理第一个消息。如果我引入了sleep命令,它可以遍历所有源内容,它会关闭图形,最后我会得到一些帮助。

我看不到我在做什么错,因为它基本上是一个片段,您可以在任何地方找到它,并作为示例使用

仅当源已传递所有消息时,如何才能触发rainAndShutDown调用

感谢您的帮助!

注意:我使用的是akka-stream-kafka 0.22版本

解决方法

根据Alpakka Kafka's docsimport React,{ useEffect,useState } from "react"; const GetLeagues = async (country) => { // This helper function fetches your leagues const url = 'your url' const res = await fetch(url); const { countries } = await res.json(); return countries; }; const Leagues = () => { const [leagues,setLeagues] = useState([]); useEffect(() => { async function init() { // Declaring an extra function as useEffect // cannot be async. const countries = await GetLeagues(); setLeagues(countries); } init(); },[]); return ( <div> {leagues.map((country,i) => { return <li key={i}>{country.strLeague}</li>; })} </div> ); }; export default Leagues; 的行为是:

停止从Source产生消息,等待流完成并关闭使用者Source,以便所有消耗的消息到达流的末尾。

这意味着您将停止在用户中接收邮件,并且它将仅处理已入队的邮件。

对于测试流,我建议您看一下Streams TestKit,因为它可以让您对要测试的流的哪些部分进行大量控制

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...