Azure blob 存储的 Spark 流检查点问题:TaskCompletionListener null 中的错误

问题描述

我正在使用 Spark 结构化流的检查点功能,并将检查点元数据存储为 azure blob。

但是我遇到以下错误,从日志看来它正在删除临时文件并尝试再次访问它。

以下是详细日志和任务计划

日志

2021-02-10 20:28:55.633 DEBUG --- [Executor task launch worker for task 15] org.apache.hadoop.fs.azure.NativeAzureFileSystem : Delete Successful for : wasb://mosaic-k8s-hdi-container@mosaick8shdistorage.blob.core.windows.net/k8s_performance/streamCheckpoint/c0c92f4f-2708-412c-82c1-053b32ba63c9pot5/MysqL1/state/0/2/.1.delta.3554a10d-9d96-4ddb-84f1-a49334757dc9.TID15.tmp
2021-02-10 20:28:55.634 ERROR --- [Executor task launch worker for task 15] org.apache.spark.internal.Logging$class : Error in TaskCompletionListener
java.lang.NullPointerException
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.write(NativeAzureFileSystem.java:1140)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:258)
    at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:190)
    at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
    at org.apache.commons.io.IoUtils.closeQuietly(IoUtils.java:363)
    at org.apache.commons.io.IoUtils.closeQuietly(IoUtils.java:317)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:508)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:65)
    at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:64)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2021-02-10 20:28:55.639 ERROR --- [Executor task launch worker for task 15] org.apache.spark.internal.Logging$class : Exception in task 2.0 in stage 3.0 (TID 15)
org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2021-02-10 20:28:55.658 DEBUG --- [dispatcher-event-loop-0] org.apache.spark.internal.Logging$class : parentName:,name: TaskSet_3.0,runningTasks: 3
2021-02-10 20:28:55.659 DEBUG --- [dispatcher-event-loop-0] org.apache.spark.internal.Logging$class : No tasks for locality level NO_PREF,so moving to locality level ANY
2021-02-10 20:28:55.661 WARN  --- [task-result-getter-1] org.apache.spark.internal.Logging$class : Lost task 2.0 in stage 3.0 (TID 15,localhost,executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2021-02-10 20:28:55.663 ERROR --- [task-result-getter-1] org.apache.spark.internal.Logging$class : Task 2 in stage 3.0 Failed 1 times; aborting job
2021-02-10 20:28:55.672 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Cancelling stage 3
2021-02-10 20:28:55.673 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Killing all running tasks in stage 3: Stage cancelled
2021-02-10 20:28:55.678 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 3.0 in stage 3.0 (TID 16),reason: Stage cancelled
2021-02-10 20:28:55.679 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 0.0 in stage 3.0 (TID 13),reason: Stage cancelled
2021-02-10 20:28:55.679 INFO  --- [dispatcher-event-loop-2] org.apache.spark.internal.Logging$class : Executor is trying to kill task 1.0 in stage 3.0 (TID 14),reason: Stage cancelled
2021-02-10 20:28:55.680 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : Stage 3 was cancelled
2021-02-10 20:28:55.681 INFO  --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : ResultStage 3 (start at taskCompletion.java:414) Failed in 47.809 s due to Job aborted due to stage failure: Task 2 in stage 3.0 Failed 1 times,most recent failure: Lost task 2.0 in stage 3.0 (TID 15,executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 2,remaining stages = 3
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 1,remaining stages = 2
2021-02-10 20:28:55.687 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 3,remaining stages = 1
2021-02-10 20:28:55.688 DEBUG --- [dag-scheduler-event-loop] org.apache.spark.internal.Logging$class : After removal of stage 0,remaining stages = 0
2021-02-10 20:28:55.689 INFO  --- [stream execution thread for [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79,runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]] org.apache.spark.internal.Logging$class : Job 0 Failed: start at taskCompletion.java:414,took 244.059233 s
2021-02-10 20:28:55.691 ERROR --- [stream execution thread for [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79,runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]] org.apache.spark.internal.Logging$class : Aborting job 0b63e841-559f-4106-a3f8-f680fc7fdfcc.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 Failed 1 times,executor driver): org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndindependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
    at org.apache.spark.sql.execution.sqlExecution$$anonfun$withNewExecutionId$1.apply(sqlExecution.scala:78)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:125)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTiMetaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTiMetaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.util.TaskCompletionListenerException: null
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

逻辑计划

=== Streaming Query ===
Identifier: [id = 3e8b8ac2-f836-44e1-b71f-9e0c030ccc79,runId = 2222948c-53e5-411a-ad22-7083bc10ed5f]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.eventhubs.EventHubsSource@5a86ea54: {"test":{"0":49743985}},org.apache.spark.sql.eventhubs.EventHubsSource@108ea531: {"sample":{"2":155436179,"5":155434270,"4":155415112,"7":155434738,"1":155439493,"3":155417225,"6":155429651,"0":155439349}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [GrpCol#1515,COL_1_COUNT#2554L,window#2136-T60000ms.start AS start#2561,window#2136-T60000ms.end AS end#2562]
+- SubqueryAlias `groupby1`
   +- Project [GrpCol#1515,window#2136-T60000ms,count(COL_1)#2549L AS COL_1_COUNT#2554L]
      +- Aggregate [GrpCol#1515,window#2550-T60000ms],[GrpCol#1515,window#2550-T60000ms AS window#2136-T60000ms,count(COL_1#1310L) AS count(COL_1)#2549L]
         +- Filter isnotnull(TIMESTAMP_COL_3#1514-T60000ms)
            +- Project [named_struct(start,precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms,TimestampType,LongType) - 0) as double) / cast(60000000 as double))) as double) = (cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms,LongType) - 0) as double) / cast(60000000 as double))) THEN (CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms,LongType) - 0) as double) / cast(60000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms,LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0),LongType,TimestampType),end,precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(TIMESTAMP_COL_3#1514-T60000ms,LongType) - 0) as double) / cast(60000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 60000000) + 0) + 60000000),TimestampType)) AS window#2550-T60000ms,COL_1#1310L,COL_10#1311,COL_100#1312,COL_101#1313L,COL_102#1314L,COL_103#1315L,COL_104#1316L,COL_105#1317,COL_106#1318L,COL_107#1319L,COL_108#1320L,COL_109#1321,COL_11#1322L,COL_110#1323,COL_111#1324,COL_112#1325,COL_113#1326L,COL_114#1327,COL_115#1328L,COL_116#1329L,COL_117#1330L,COL_118#1331,COL_119#1332,... 389 more fields]
               +- EventTimeWatermark TIMESTAMP_COL_3#1514: timestamp,interval 1 minutes
                  +- Project [COL_1#48L AS COL_1#1310L,COL_10#49 AS COL_10#1311,COL_100#50 AS COL_100#1312,COL_101#51L AS COL_101#1313L,COL_102#52L AS COL_102#1314L,COL_103#53L AS COL_103#1315L,COL_104#54L AS COL_104#1316L,COL_105#55 AS COL_105#1317,COL_106#56L AS COL_106#1318L,COL_107#57L AS COL_107#1319L,COL_108#58L AS COL_108#1320L,COL_109#59 AS COL_109#1321,COL_11#60L AS COL_11#1322L,COL_110#61 AS COL_110#1323,COL_111#62 AS COL_111#1324,COL_112#63 AS COL_112#1325,COL_113#64L AS COL_113#1326L,COL_114#65 AS COL_114#1327,COL_115#66L AS COL_115#1328L,COL_116#67L AS COL_116#1329L,COL_117#68L AS COL_117#1330L,COL_118#69 AS COL_118#1331,COL_119#70 AS COL_119#1332,COL_12#71L AS COL_12#1333L,... 388 more fields]
                     +- Join Inner,(COL_1#48L = COL_1#703L)
                        :- SubqueryAlias `watermark1`
                        :  +- EventTimeWatermark TIMESTAMP_COL_3#447: timestamp,interval 1 minutes
                        :     +- Project [COL_1#48L,COL_10#49,COL_100#50,COL_101#51L,COL_102#52L,COL_103#53L,COL_104#54L,COL_105#55,COL_106#56L,COL_107#57L,COL_108#58L,COL_109#59,COL_11#60L,COL_110#61,COL_111#62,COL_112#63,COL_113#64L,COL_114#65,COL_115#66L,COL_116#67L,COL_117#68L,COL_118#69,COL_119#70,COL_12#71L,... 182 more fields]
                        :        +- SubqueryAlias `eventhub1`
                        :           +- Project [LIST#37.COL_1 AS COL_1#48L,LIST#37.COL_10 AS COL_10#49,LIST#37.COL_100 AS COL_100#50,LIST#37.COL_101 AS COL_101#51L,LIST#37.COL_102 AS COL_102#52L,LIST#37.COL_103 AS COL_103#53L,LIST#37.COL_104 AS COL_104#54L,LIST#37.COL_105 AS COL_105#55,LIST#37.COL_106 AS COL_106#56L,LIST#37.COL_107 AS COL_107#57L,LIST#37.COL_108 AS COL_108#58L,LIST#37.COL_109 AS COL_109#59,LIST#37.COL_11 AS COL_11#60L,LIST#37.COL_110 AS COL_110#61,LIST#37.COL_111 AS COL_111#62,LIST#37.COL_112 AS COL_112#63,LIST#37.COL_113 AS COL_113#64L,LIST#37.COL_114 AS COL_114#65,LIST#37.COL_115 AS COL_115#66L,LIST#37.COL_116 AS COL_116#67L,LIST#37.COL_117 AS COL_117#68L,LIST#37.COL_118 AS COL_118#69,LIST#37.COL_119 AS COL_119#70,LIST#37.COL_12 AS COL_12#71L,... 180 more fields]
                        :              +- Project [jsontostructs(StructField(COL_1,true),StructField(COL_10,BooleanType,StructField(COL_100,StructField(COL_101,StructField(COL_102,StructField(COL_103,StructField(COL_104,StructField(COL_105,StringType,StructField(COL_106,StructField(COL_107,StructField(COL_108,StructField(COL_109,StructField(COL_11,StructField(COL_110,DoubleType,StructField(COL_111,StructField(COL_112,StructField(COL_113,StructField(COL_114,StructField(COL_115,StructField(COL_116,StructField(COL_117,StructField(COL_118,StructField(COL_119,StructField(COL_12,... 173 more fields) AS LIST#37,body#27,partition#10,offset#11,sequenceNumber#12L,enqueuedTime#13,publisher#14,partitionKey#15,properties#16,systemProperties#17]
                        :                 +- Project [cast(body#9 as string) AS body#27,systemProperties#17]
                        :                    +- StreamingExecutionRelation org.apache.spark.sql.eventhubs.EventHubsSource@108ea531,[body#9,systemProperties#17]
                        +- SubqueryAlias `watermark2`
                           +- EventTimeWatermark TIMESTAMP_COL_3#1102: timestamp,interval 1 minutes
                              +- Project [COL_1#703L,COL_10#704,COL_100#705,COL_101#706L,COL_102#707L,COL_103#708L,COL_104#709L,COL_105#710,COL_106#711L,COL_107#712L,COL_108#713L,COL_109#714,COL_11#715L,COL_110#716,COL_111#717,COL_112#718,COL_113#719L,COL_114#720,COL_115#721L,COL_116#722L,COL_117#723L,COL_118#724,COL_119#725,COL_12#726L,... 182 more fields]
                                 +- SubqueryAlias `eventhub2`
                                    +- Project [LIST#692.COL_1 AS COL_1#703L,LIST#692.COL_10 AS COL_10#704,LIST#692.COL_100 AS COL_100#705,LIST#692.COL_101 AS COL_101#706L,LIST#692.COL_102 AS COL_102#707L,LIST#692.COL_103 AS COL_103#708L,LIST#692.COL_104 AS COL_104#709L,LIST#692.COL_105 AS COL_105#710,LIST#692.COL_106 AS COL_106#711L,LIST#692.COL_107 AS COL_107#712L,LIST#692.COL_108 AS COL_108#713L,LIST#692.COL_109 AS COL_109#714,LIST#692.COL_11 AS COL_11#715L,LIST#692.COL_110 AS COL_110#716,LIST#692.COL_111 AS COL_111#717,LIST#692.COL_112 AS COL_112#718,LIST#692.COL_113 AS COL_113#719L,LIST#692.COL_114 AS COL_114#720,LIST#692.COL_115 AS COL_115#721L,LIST#692.COL_116 AS COL_116#722L,LIST#692.COL_117 AS COL_117#723L,LIST#692.COL_118 AS COL_118#724,LIST#692.COL_119 AS COL_119#725,LIST#692.COL_12 AS COL_12#726L,... 180 more fields]
                                       +- Project [jsontostructs(StructField(COL_1,... 173 more fields) AS LIST#692,body#682,partition#665,offset#666,sequenceNumber#667L,enqueuedTime#668,publisher#669,partitionKey#670,properties#671,systemProperties#672]
                                          +- Project [cast(body#664 as string) AS body#682,systemProperties#672]
                                             +- StreamingExecutionRelation org.apache.spark.sql.eventhubs.EventHubsSource@5a86ea54,[body#664,systemProperties#672]

Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.

jar 版本 天蓝色存储 - 8.4.0, hadoop-azure - 2.9.2

解决方法

我遇到了同样的问题,我将 Spark、Azure Storage 和 Hadoop Azure 依赖项更改为以下版本后解决了:

  • 火花 3.1.1
  • 天蓝色存储 7.0.0
  • hadoop-azure 3.2.0