问题描述
我正在尝试从kafka经纪人向akka流图提供数据(例如10个字符串)
这是单元测试中的图表:
val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value,"porteur-id")
val drainingControl =
Consumer
.plainSource(consumer.kafkaConsumerSettings,Subscriptions.topics("transaction"))
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete,Duration.Inf).size should be > 0
单元测试失败(大小等于0,预期为10)
但是,如果在我的drandAndShutdown()调用之前插入一个Thread.sleep(5000)
,它将通过。
给我的印象是,在不睡眠的情况下,该图形在run()调用之后直接关闭,并且没有时间甚至处理第一个消息。如果我引入了sleep命令,它可以遍历所有源内容,它会关闭图形,最后我会得到一些帮助。
我看不到我在做什么错,因为它基本上是一个片段,您可以在任何地方找到它,并作为示例使用
仅当源已传递所有消息时,如何才能触发rainAndShutDown调用?
感谢您的帮助!
注意:我使用的是akka-stream-kafka 0.22版本
解决方法
根据Alpakka Kafka's docs,import React,{ useEffect,useState } from "react";
const GetLeagues = async (country) => {
// This helper function fetches your leagues
const url = 'your url'
const res = await fetch(url);
const { countries } = await res.json();
return countries;
};
const Leagues = () => {
const [leagues,setLeagues] = useState([]);
useEffect(() => {
async function init() {
// Declaring an extra function as useEffect
// cannot be async.
const countries = await GetLeagues();
setLeagues(countries);
}
init();
},[]);
return (
<div>
{leagues.map((country,i) => {
return <li key={i}>{country.strLeague}</li>;
})}
</div>
);
};
export default Leagues;
的行为是:
停止从Source产生消息,等待流完成并关闭使用者Source,以便所有消耗的消息到达流的末尾。
这意味着您将停止在用户中接收邮件,并且它将仅处理已入队的邮件。
对于测试流,我建议您看一下Streams TestKit,因为它可以让您对要测试的流的哪些部分进行大量控制