问题描述
我们目前正在开发一个数据管道堆栈,我们使用了 CamelAzurestorageblobSinkConnector(0.9.x),它基本上是从 Kafka(cp-kafka-5.0.0) 读取特定主题并将每条记录附加到特定的 Azure AppendBlob。
同步工作正在完美进行,但我们在堆栈中发现了一个小故障。
JSON 记录已附加到 blob 文件中,没有任何换行,如下所示 -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373"," bar"}{"foo":"bar"}
这会影响 blob 文件的进一步处理。
我们的 CamelAzurestorageblobSinkConnector.properties 如下所示 -
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
camel.sink.marshal=json-jackson
# comma separated topics to get messages from
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:Now:yyyyMMdd}/${date:Now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
非常感谢这里的任何帮助!
解决方法
首先,感谢@OneCricketeer 的建议。
我能够成功地在通过接收器连接器的每个事件/记录处附加行分隔符。
已经在camel组件级别做了如下配置:
- 将键值转换器从 JSONConverter 修改为 StringConverter。这是修改后的 CamelAzurestorageblobSinkConnector.properties
name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
- 像这样覆盖 apply() 方法 -
@Override
private R apply (R record) {
String value = (String) operatingValue(record);
String updated_value = value+System.lineSeparator();
System.out.println("UPDATED SMT RECORD: "+updated_value);
return newRecord(record,null,updated_value);
}
现在我们在 blob 中有如下记录 -
{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}
{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}