问题描述
我每 5 秒有一个流查询将数据从 Azure Eventthubs 流式传输到 ADLS,并且相同的流查询是 1 小时窗口的水印,水印延迟为 5 分钟。
代码:
val rawStreamQuery = messages.writeStream.format("delta")
.option("checkpointLocation",BASE_LOC + "checkpoint/" + RAW_SCHEMA_NAME + "/" + RAW_TASK_TABLE)
.trigger(Trigger.ProcessingTime(RAW_STREAM_TRIGGER_INTERVAL))
.table(RAW_SCHEMA_NAME + "." + RAW_TASK_TABLE)
rawStreamQuery.withWatermark(watermarkTimeStamp,STREAM_WATERMARK) //5 minutes
.groupBy(window(col(watermarkTimeStamp),STREAM_WINDOW).as("window")) //1 hour
.count()
.select(
lit(commonDataObj.getFeedName).as("Feed_name"),lit(commonDataObj.getStage).as("stage_name"),col("count").as("record_count"),col("window").getField("start").as("start_ts"),col("window").getField("end").as("end_ts")
)
出现以下错误。
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)