使用C#将Avro消息发送到Azure事件中心,然后在Databricks 7.2 / Scala 3.0中使用Scala结构化流进行反序列化

问题描述

因此,在过去的几天里,我一直对此表示怀疑。我在反序列化我们正在生成并发送到Azure Event Hub的Avro文件时遇到麻烦。我们正在尝试使用Databricks Runtime 7.2结构化流进行此操作。使用here中所述的更新的from_avro方法来反序列化事件消息的正文。

   import org.apache.spark.eventhubs._
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.avro._
   import org.apache.avro._
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.avro.functions._


    val connStr = "<EventHubConnectionstring>"

    val customEventhubParameters =
               EventHubsConf(connStr.toString())
               .setMaxEventsPerTrigger(5)
             //.setStartingPosition(EventPosition.fromStartOfStream)

    val incomingStream = spark
                        .readStream
                        .format("eventhubs")
                        .options(customEventhubParameters.toMap)
                        .load()
                        .filter($"properties".getItem("TableName") === "Branches")
    
    val avroSchema = s"""{"type":"record","name":"Branches","fields":[{"name":"_src_ChangeOperation","type":["null","string"]},{"name":"_src_CurrentTrackingId","long"]},{"name":"_src_RecordExtractUTCTimestamp","type":"string"},{"name":"ID","int"]},{"name":"BranchCode",{"name":"BranchName",{"name":"Address1",{"name":"Address2",{"name":"City",{"name":"StateID",{"name":"ZipCode",{"name":"Telephone",{"name":"Contact",{"name":"Title",{"name":"dob",{"name":"TimeZoneID",{"name":"ObserveDaylightSaving","boolean"]},{"name":"PaySummerTimeHour",{"name":"PayWinterTimeHour",{"name":"BillSummerTimeHour",{"name":"BillWinterTimeHour",{"name":"Deleted",{"name":"LastUpdated",{"name":"txJobID",{"name":"SourceID",{"name":"HP_UseHolPayHourMethod",{"name":"HP_HourlyRatePercent","float"]},{"name":"HP_requiredWeeksOfEmployment",{"name":"rgUseSystemSettings",{"name":"rgDutySplitBy",{"name":"rgBasePeriodDate",{"name":"rgFirstDayOfWeek",{"name":"rgDutyStartOfDayTime",{"name":"rgHolidayStartOfDayTime",{"name":"rgMinimumTimePeriod",{"name":"rgLoadPublicTable",{"name":"rgPOTPayPeriodID",{"name":"rgPOT1",{"name":"rgPOT2",{"name":"Facsimile",{"name":"CountryID",{"name":"EmailAddress",{"name":"ContractSecurityHistoricalWeeks",{"name":"ContractSecurityFutureWeeks",{"name":"TimeLinkTelephone1",{"name":"TimeLinkTelephone2",{"name":"TimeLinkTelephone3",{"name":"TimeLinkTelephone4",{"name":"TimeLinkTelephone5",{"name":"AutoTakeMissedCalls",{"name":"AutoTakeMissedCallsDuration",{"name":"AutoTakeApplyDurationtocheckCalls",{"name":"AutoTakeMissedCheckCalls",{"name":"AutoTakeMissedCheckCallsDuration",{"name":"DocumentLocation",{"name":"DefaultPortalAccess",{"name":"DefaultPortalSecurityRoleID",{"name":"EmployeeTemplateID",{"name":"SiteCardTemplateID",{"name":"TSAllowancesHeaderID",{"name":"TSMinimumWageHeaderID",{"name":"TimeLinkClaimMade",{"name":"TSAllowancePeriodBaseDate",{"name":"TSAllowancePeriodID",{"name":"TSMinimumWageCalcmethodID",{"name":"FlexibleShiftsHeaderID",{"name":"SchedulingUseSystemSettings",{"name":"MinimumRestPeriod",{"name":"TSMealBreakHeaderID",{"name":"ServiceTracImportType",{"name":"StandDownDiaryEventID",{"name":"ScheduledDutyChangeMessageTemplateId",{"name":"ScheduledDutyAddedMessageTemplateId",{"name":"ScheduledDutyRemovedMessageTemplateId",{"name":"NegativeMessageResponsesPermitted",{"name":"PortalEventsstandardLocFirst",{"name":"ReminderMessage",{"name":"ReminderMessageDaysBefore",{"name":"ReminderMessageTemplateId",{"name":"ScheduledDutyChangeMessageAllowReply",{"name":"ScheduledDutyAddedMessageAllowReply",{"name":"PayAlertEscalationGroup",{"name":"BudgetedPay",{"name":"PayAlertvariance",{"name":"BusinessUnitID",{"name":"APH_Hours",{"name":"APH_Period",{"name":"APH_PeriodCount",{"name":"AveragePeriodHoursRuleId",{"name":"HolidayScheduleID",{"name":"AutomationRuleProfileId","int"]}]}"""

    val decoded_df = incomingStream
                    .select(
                       from_avro($"body",avroSchema).alias("payload")
                     )

    val query1 = (
                  decoded_df
                 .writeStream
                 .format("memory")
                 .queryName("read_hub")
                 .start()
                 )

我已验证我们发送的文件具有有效的架构,其中包含数据,并且在出现以下堆栈跟踪失败(表明数据格式错误)之前,它已进入笔记本中的流作业。但是,我能够将生成文件写入.avro文件,并使用正常的.read.format(“ avro”)方法将其反序列化就可以了。

    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WritetoDataSourceV2Exec.scala:413)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WritetoDataSourceV2Exec.scala:361)
    at org.apache.spark.sql.execution.datasources.v2.WritetoDataSourceV2Exec.writeWithV2(WritetoDataSourceV2Exec.scala:322)
    at org.apache.spark.sql.execution.datasources.v2.WritetoDataSourceV2Exec.run(WritetoDataSourceV2Exec.scala:329)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
    at org.apache.spark.sql.execution.collect.Collector$.callExecuteCollect(Collector.scala:118)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
    at org.apache.spark.sql.execution.ResultCacheManager.getorComputeResult(ResultCacheManager.scala:508)
    at org.apache.spark.sql.execution.ResultCacheManager.getorComputeResult(ResultCacheManager.scala:480)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:396)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2986)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3692)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2953)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3684)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$5(sqlExecution.scala:116)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:248)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$1(sqlExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
    at org.apache.spark.sql.execution.sqlExecution$.withCustomExecutionEnv(sqlExecution.scala:77)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:198)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3682)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2953)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$5(sqlExecution.scala:116)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:248)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$1(sqlExecution.scala:101)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
    at org.apache.spark.sql.execution.sqlExecution$.withCustomExecutionEnv(sqlExecution.scala:77)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:581)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:276)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:274)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:71)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:581)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:231)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:276)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:274)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:71)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 37.0 (TID 84,10.139.64.5,executor 0): org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result,try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:111)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WritetoDataSourceV2Exec.scala:438)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1615)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WritetoDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WritetoDataSourceV2Exec.scala:385)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:657)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:660)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Arrayindexoutofboundsexception: -40
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readindex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:100)
    ... 16 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2478)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2313)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WritetoDataSourceV2Exec.scala:382)
    ... 46 more
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result,try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:111)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WritetoDataSourceV2Exec.scala:438)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1615)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WritetoDataSourceV2Exec.scala:477)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WritetoDataSourceV2Exec.scala:385)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:657)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:660)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Arrayindexoutofboundsexception: -40
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readindex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:100)
    ... 16 more

技术

  • 使用Avro 1.8.2生成Avro文件的C#Azure Function v3 .net核心
  • 使用通用作家而非特定作家将Avro文件序列化为字节数组,并将其发送到Azure事件中心
  • Databricks运行时7.2 / Scala 3.0
  • 用Scala编写的Databricks笔记本
  • Databricks结构化流笔记本,用于反序列化Avro消息 并发送到三角洲湖表

不使用以下

  • 事件中心捕获
  • 卡夫卡
  • 架构注册

解决方法

好,所以我才弄清楚问题出在哪里。这是我们将avro消息发送到事件中心之前的方式。在我们的序列化方法中,我们使用var writer = new GenericDatumWriter<GenericRecord>(schema);IFileWriter<GenericRecord>写入内存流,然后仅获取该流的字节数组,如下所示。

public byte[] Serialize(DataCapture data)
        {
            var schema = GenerateSchema(data.Schema);
            var writer = new GenericDatumWriter<GenericRecord>(schema);

            using(var ms = new MemoryStream())
            {
                using (IFileWriter<GenericRecord> fileWriter = DataFileWriter<GenericRecord>.OpenWriter(writer,ms))
                {
                    foreach (var jsonString in data.Rows)
                    {
                        var record = new GenericRecord(schema);
                        var obj = JsonConvert.DeserializeObject<JObject>(jsonString);

                        foreach (var column in data.Schema.Columns)
                        {
                            switch (MapDataType(column.DataTypeName))
                            {
                                case AvroTypeEnum.Boolean:
                                    record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<bool?>());
                                    break;
                                //Map all datatypes ect....removed to shorten example
                                default:
                                    record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<string>());
                                    break;
                            }
                        }
                        fileWriter.Append(record);
                    }
                }
                return ms.ToArray();
            }
        }

我们实际上应该做的是使用var writer = new DefaultWriter(schema); var encoder = new BinaryEncoder(ms);在返回流的字节数组之前用writer.Write(record,encoder);写入记录。

public byte[] Serialize(DataCapture data)
        {
            var schema = GenerateSchema(data.Schema);
            var writer = new DefaultWriter(schema);

            using (var ms = new MemoryStream())
            {
                var encoder = new BinaryEncoder(ms);

                foreach (var jsonString in data.Rows)
                {
                    var record = new GenericRecord(schema);
                    var obj = JsonConvert.DeserializeObject<JObject>(jsonString);

                    foreach (var column in data.Schema.Columns)
                    {
                        switch (MapDataType(column.DataTypeName))
                        {
                           case AvroTypeEnum.Boolean:
                                    record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<string>());
                                    break;
                        }
                    }
                    writer.Write(record,encoder);
                }
                return ms.ToArray();
            }
        }

因此吸取的教训是,并非所有转换为byte []的Avro内存流都是相同的。 from_avro方法将仅反序列化已使用BinaryEncoder类进行二进制编码的avro数据,而不是使用IFileWriter创建的数据的反序列化。如果我应该做一些事情,请告诉我,但这解决了我的问题。希望我的痛苦能使别人免于痛苦。