如何获取结构化流中的记录数计数?

问题描述

我有一个Spark结构化的流媒体,在我有正确的记录计数后,我需要停止流媒体播放过程。

到目前为止,我有一种方法可以在没有活动的特定时间间隔停止流式传输,如何添加一个计数器来获取计数并关闭流式传输?

 val resultStream=furtherFlattening
      .writeStream
      .format("console")
      .option("truncate","false")
      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS))
      //. trigger(Trigger.ProcessingTime(5,TimeUnit.MINUTES))
      .start()

      .awaitTermination()


def stopStreamQuery(query: StreamingQuery,awaitTerminationTimeMs: Long,spark:SparkSession) {

  while (query.isActive) {
    val msg = query.status.message
    if (!query.status.isDataAvailable
      && !query.status.isTriggerActive
      && !msg.equals("Initializing sources")) {
      query.stop()
      spark.close()
    }
    query.awaitTermination(awaitTerminationTimeMs)
  }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...