如何解析 Kafka 中的 Avro 数据

问题描述

我为 Kafka 记录写了一个 Avro 模式

ProducerRecord<String,TestSchema> record = new ProducerRecord<String,TestSchema>("TestTopic0409",testSchema)

我想把它加载到 Druid。我本地启动Druid,连接Kafka数据时,结果显示乱码:

[在此处输入图片说明][1]

然后我使用如下规范:

{
"type": "kafka","spec": {
"dataSchema": {
"dataSource": "new-data-source","timestampSpec": null,"dimensionsspec": null,"metricsspec": [],"granularitySpec": {
"type": "uniform","segmentGranularity": "DAY","queryGranularity": {
"type": "none"
},"rollup": true,"intervals": null
},"transformSpec": {
"filter": null,"transforms": []
},"parser": {
"type": "avro_stream","avroBytesDecoder": {
"type": "schema_inline","schema": {
"namespace": "com.airebroker.data","name": "Test","type": "record","fields": [
{
"name": "id","type": "int"
},{
"name": "name","type": "string"
},{
"name": "timestamp","type": "long"
}
]
}
},"parseSpec": {
"format": "avro","timestampSpec": {},"dimensionsspec": {}
}
}
},"ioConfig": {
"topic": "TestTopic0409","inputFormat": {
"type": "avro_ocf","flattenSpec": {
"useFielddiscovery": true,"fields": []
},"binaryAsstring": false
},"replicas": 1,"taskCount": 1,"taskDuration": "PT3600S","consumerProperties": {
"bootstrap.servers": "localhost:9092"
},"pollTimeout": 100,"startDelay": "PT5S","period": "PT30S","useEarliestOffset": false,"completionTimeout": "PT1800S","lateMessageRejectionPeriod": null,"earlyMessageRejectionPeriod": null,"lateMessageRejectionStartDateTime": null,"stream": "TestTopic0409","useEarliestSequenceNumber": false,"type": "kafka"
},"tuningConfig": {
"type": "kafka","maxRowsInMemory": 1000000,"maxBytesInMemory": 0,"maxRowsPerSegment": 5000000,"maxTotalRows": null,"intermediatePersistPeriod": "PT10M","basePersistDirectory": "/home/zhangjh/apache-druid-0.20.2/var/tmp/druid-realtime-persist7289903804951562243","maxPendingPersists": 0,"indexSpec": {
"bitmap": {
"type": "roaring","compressRunOnSerialization": true
},"dimensionCompression": "lz4","metricCompression": "lz4","longEncoding": "longs","segmentLoader": null
},"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring","buildV9Directly": true,"reportParseExceptions": false,"handoffConditionTimeout": 0,"resetoffsetAutomatically": false,"segmentWriteOutMediumFactory": null,"workerThreads": null,"chatThreads": null,"chatRetries": 8,"httpTimeout": "PT10S","shutdownTimeout": "PT80S","offsetFetchPeriod": "PT30S","intermediateHandoffPeriod": "P2147483647D","logParseExceptions": false,"maxParseExceptions": 2147483647,"maxSavedParseExceptions": 0,"skipSequenceNumberAvailabilityCheck": false,"repartitionTransitionDuration": "PT120S"
}
}
}

然后它给了我一个结果:错误:未定义

我还以为是我的spec文件格式不对,结果在官网上试了各种方法解析Kafka中的Avro数据。所有返回我:错误:未定义。

然后我继续尝试手动解析Avro数据并通过扩展将其拼接成JSON数据。我定义了一个类:public class ExampleByteBufferInputRowParser implements ByteBufferInputRowParser。在parseBatch函数中我写了一个txt到tmp路径,但是当我解析数据时,方法没有通过这里。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)