问题描述
我有一个Spark结构化的流媒体,在我有正确的记录计数后,我需要停止流媒体播放过程。
到目前为止,我有一种方法可以在没有活动的特定时间间隔停止流式传输,如何添加一个计数器来获取计数并关闭流式传输?
val resultStream=furtherFlattening
.writeStream
.format("console")
.option("truncate","false")
.trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS))
//. trigger(Trigger.ProcessingTime(5,TimeUnit.MINUTES))
.start()
.awaitTermination()
def stopStreamQuery(query: StreamingQuery,awaitTerminationTimeMs: Long,spark:SparkSession) {
while (query.isActive) {
val msg = query.status.message
if (!query.status.isDataAvailable
&& !query.status.isTriggerActive
&& !msg.equals("Initializing sources")) {
query.stop()
spark.close()
}
query.awaitTermination(awaitTerminationTimeMs)
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)