问题描述
因此,在过去的几天里,我一直对此表示怀疑。我在反序列化我们正在生成并发送到Azure Event Hub的Avro文件时遇到麻烦。我们正在尝试使用Databricks Runtime 7.2结构化流进行此操作。使用here中所述的更新的from_avro方法来反序列化事件消息的正文。
import org.apache.spark.eventhubs._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro._
import org.apache.avro._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.functions._
val connStr = "<EventHubConnectionstring>"
val customEventhubParameters =
EventHubsConf(connStr.toString())
.setMaxEventsPerTrigger(5)
//.setStartingPosition(EventPosition.fromStartOfStream)
val incomingStream = spark
.readStream
.format("eventhubs")
.options(customEventhubParameters.toMap)
.load()
.filter($"properties".getItem("TableName") === "Branches")
val avroSchema = s"""{"type":"record","name":"Branches","fields":[{"name":"_src_ChangeOperation","type":["null","string"]},{"name":"_src_CurrentTrackingId","long"]},{"name":"_src_RecordExtractUTCTimestamp","type":"string"},{"name":"ID","int"]},{"name":"BranchCode",{"name":"BranchName",{"name":"Address1",{"name":"Address2",{"name":"City",{"name":"StateID",{"name":"ZipCode",{"name":"Telephone",{"name":"Contact",{"name":"Title",{"name":"dob",{"name":"TimeZoneID",{"name":"ObserveDaylightSaving","boolean"]},{"name":"PaySummerTimeHour",{"name":"PayWinterTimeHour",{"name":"BillSummerTimeHour",{"name":"BillWinterTimeHour",{"name":"Deleted",{"name":"LastUpdated",{"name":"txJobID",{"name":"SourceID",{"name":"HP_UseHolPayHourMethod",{"name":"HP_HourlyRatePercent","float"]},{"name":"HP_requiredWeeksOfEmployment",{"name":"rgUseSystemSettings",{"name":"rgDutySplitBy",{"name":"rgBasePeriodDate",{"name":"rgFirstDayOfWeek",{"name":"rgDutyStartOfDayTime",{"name":"rgHolidayStartOfDayTime",{"name":"rgMinimumTimePeriod",{"name":"rgLoadPublicTable",{"name":"rgPOTPayPeriodID",{"name":"rgPOT1",{"name":"rgPOT2",{"name":"Facsimile",{"name":"CountryID",{"name":"EmailAddress",{"name":"ContractSecurityHistoricalWeeks",{"name":"ContractSecurityFutureWeeks",{"name":"TimeLinkTelephone1",{"name":"TimeLinkTelephone2",{"name":"TimeLinkTelephone3",{"name":"TimeLinkTelephone4",{"name":"TimeLinkTelephone5",{"name":"AutoTakeMissedCalls",{"name":"AutoTakeMissedCallsDuration",{"name":"AutoTakeApplyDurationtocheckCalls",{"name":"AutoTakeMissedCheckCalls",{"name":"AutoTakeMissedCheckCallsDuration",{"name":"DocumentLocation",{"name":"DefaultPortalAccess",{"name":"DefaultPortalSecurityRoleID",{"name":"EmployeeTemplateID",{"name":"SiteCardTemplateID",{"name":"TSAllowancesHeaderID",{"name":"TSMinimumWageHeaderID",{"name":"TimeLinkClaimMade",{"name":"TSAllowancePeriodBaseDate",{"name":"TSAllowancePeriodID",{"name":"TSMinimumWageCalcmethodID",{"name":"FlexibleShiftsHeaderID",{"name":"SchedulingUseSystemSettings",{"name":"MinimumRestPeriod",{"name":"TSMealBreakHeaderID",{"name":"ServiceTracImportType",{"name":"StandDownDiaryEventID",{"name":"ScheduledDutyChangeMessageTemplateId",{"name":"ScheduledDutyAddedMessageTemplateId",{"name":"ScheduledDutyRemovedMessageTemplateId",{"name":"NegativeMessageResponsesPermitted",{"name":"PortalEventsstandardLocFirst",{"name":"ReminderMessage",{"name":"ReminderMessageDaysBefore",{"name":"ReminderMessageTemplateId",{"name":"ScheduledDutyChangeMessageAllowReply",{"name":"ScheduledDutyAddedMessageAllowReply",{"name":"PayAlertEscalationGroup",{"name":"BudgetedPay",{"name":"PayAlertvariance",{"name":"BusinessUnitID",{"name":"APH_Hours",{"name":"APH_Period",{"name":"APH_PeriodCount",{"name":"AveragePeriodHoursRuleId",{"name":"HolidayScheduleID",{"name":"AutomationRuleProfileId","int"]}]}"""
val decoded_df = incomingStream
.select(
from_avro($"body",avroSchema).alias("payload")
)
val query1 = (
decoded_df
.writeStream
.format("memory")
.queryName("read_hub")
.start()
)
我已验证我们发送的文件具有有效的架构,其中包含数据,并且在出现以下堆栈跟踪失败(表明数据格式错误)之前,它已进入笔记本中的流作业。但是,我能够将生成的文件写入.avro文件,并使用正常的.read.format(“ avro”)方法将其反序列化就可以了。
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WritetoDataSourceV2Exec.scala:413)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WritetoDataSourceV2Exec.scala:361)
at org.apache.spark.sql.execution.datasources.v2.WritetoDataSourceV2Exec.writeWithV2(WritetoDataSourceV2Exec.scala:322)
at org.apache.spark.sql.execution.datasources.v2.WritetoDataSourceV2Exec.run(WritetoDataSourceV2Exec.scala:329)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at org.apache.spark.sql.execution.collect.Collector$.callExecuteCollect(Collector.scala:118)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:69)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
at org.apache.spark.sql.execution.ResultCacheManager.getorComputeResult(ResultCacheManager.scala:508)
at org.apache.spark.sql.execution.ResultCacheManager.getorComputeResult(ResultCacheManager.scala:480)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:396)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:2986)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3692)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2953)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3684)
at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$5(sqlExecution.scala:116)
at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:248)
at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$1(sqlExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
at org.apache.spark.sql.execution.sqlExecution$.withCustomExecutionEnv(sqlExecution.scala:77)
at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:198)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3682)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2953)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$5(sqlExecution.scala:116)
at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:248)
at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withCustomExecutionEnv$1(sqlExecution.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:835)
at org.apache.spark.sql.execution.sqlExecution$.withCustomExecutionEnv(sqlExecution.scala:77)
at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:581)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:276)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:274)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:71)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:581)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:231)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken(ProgressReporter.scala:276)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTiMetaken$(ProgressReporter.scala:274)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTiMetaken(StreamExecution.scala:71)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:199)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:193)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:346)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:259)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 37.0 (TID 84,10.139.64.5,executor 0): org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result,try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:111)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WritetoDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1615)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WritetoDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WritetoDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:657)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:660)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Arrayindexoutofboundsexception: -40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readindex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:100)
... 16 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2478)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2313)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WritetoDataSourceV2Exec.scala:382)
... 46 more
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result,try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:111)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WritetoDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1615)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WritetoDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WritetoDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:657)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:660)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Arrayindexoutofboundsexception: -40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readindex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:100)
... 16 more
技术
- 使用Avro 1.8.2生成Avro文件的C#Azure Function v3 .net核心
- 使用通用作家而非特定作家将Avro文件序列化为字节数组,并将其发送到Azure事件中心
- Databricks运行时7.2 / Scala 3.0
- 用Scala编写的Databricks笔记本
- Databricks结构化流笔记本,用于反序列化Avro消息 并发送到三角洲湖表
不使用以下
- 事件中心捕获
- 卡夫卡
- 架构注册表
解决方法
好,所以我才弄清楚问题出在哪里。这是我们将avro消息发送到事件中心之前的方式。在我们的序列化方法中,我们使用var writer = new GenericDatumWriter<GenericRecord>(schema);
和IFileWriter<GenericRecord>
写入内存流,然后仅获取该流的字节数组,如下所示。
public byte[] Serialize(DataCapture data)
{
var schema = GenerateSchema(data.Schema);
var writer = new GenericDatumWriter<GenericRecord>(schema);
using(var ms = new MemoryStream())
{
using (IFileWriter<GenericRecord> fileWriter = DataFileWriter<GenericRecord>.OpenWriter(writer,ms))
{
foreach (var jsonString in data.Rows)
{
var record = new GenericRecord(schema);
var obj = JsonConvert.DeserializeObject<JObject>(jsonString);
foreach (var column in data.Schema.Columns)
{
switch (MapDataType(column.DataTypeName))
{
case AvroTypeEnum.Boolean:
record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<bool?>());
break;
//Map all datatypes ect....removed to shorten example
default:
record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<string>());
break;
}
}
fileWriter.Append(record);
}
}
return ms.ToArray();
}
}
我们实际上应该做的是使用var writer = new DefaultWriter(schema);
和var encoder = new BinaryEncoder(ms);
在返回流的字节数组之前用writer.Write(record,encoder);
写入记录。
public byte[] Serialize(DataCapture data)
{
var schema = GenerateSchema(data.Schema);
var writer = new DefaultWriter(schema);
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
foreach (var jsonString in data.Rows)
{
var record = new GenericRecord(schema);
var obj = JsonConvert.DeserializeObject<JObject>(jsonString);
foreach (var column in data.Schema.Columns)
{
switch (MapDataType(column.DataTypeName))
{
case AvroTypeEnum.Boolean:
record.Add(column.ColumnName,obj.GetValue(column.ColumnName).Value<string>());
break;
}
}
writer.Write(record,encoder);
}
return ms.ToArray();
}
}
因此吸取的教训是,并非所有转换为byte []的Avro内存流都是相同的。 from_avro方法将仅反序列化已使用BinaryEncoder类进行二进制编码的avro数据,而不是使用IFileWriter创建的数据的反序列化。如果我应该做一些事情,请告诉我,但这解决了我的问题。希望我的痛苦能使别人免于痛苦。