问题描述
我正在创建一个来自xml的kakfa主题,并以avro格式写入该主题。 我正在使用文件脉冲来执行此操作,在文档中我看到了ExplodeFilter。 我试图根据文档进行配置,但无法正常工作。 connect docker控制台显示以下错误:
io.streamthoughts.kafka.connect.filepulse.data.DataException: leitura is not a valid field name
at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.lookupField(TypedStruct.java:464)
at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.get(TypedStruct.java:226)
at io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.apply(ExplodeFilter.java:66)
at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:51)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:159)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:131)
at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:99)
at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:169)
at io.streamthoughts.kafka.connect.filepulse.source.FilepulseSourceTask.poll(FilepulseSourceTask.java:131)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
按照docker-compose的字母进行操作:
connect-file-pulse:
image: streamthoughts/kafka-connect-file-pulse:latest
container_name: connect
depends_on:
- cp-broker
- cp-schema-registry
ports:
- "8083:8083"
- "8001:8000"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONfig_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONfig_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
按照输入的xml:
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<pai>test</pai>
<leitura>
<title>test</title>
<artist>test</artist>
<album>test</album>
<duration>test</duration>
</leitura>
<leitura>
<title>test2</title>
<artist>test2</artist>
<album>test2</album>
<duration>test2</duration>
</leitura>
</playlists>
遵循连接器:
{
"config":
{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilepulseSourceConnector","filters":"Explode","filters.Explode.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter","filters.Explode.source":"leitura","force.array.on.fields": "leitura","fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy","fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker","fs.scan.directory.path":"/tmp/kafka-connect/examples/","fs.scan.interval.ms":"10000","fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter","file.filter.regex.pattern":".*\\.xml$","task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader","offset.strategy":"name","topic":"Leituraraw","internal.kafka.reporter.id": "xml-config-start","internal.kafka.reporter.bootstrap.servers": "broker:29092","internal.kafka.reporter.topic":"Leituraraw","tasks.max": 1
},"name": "xml-config"
}
解决方法
该错误是由于ExplodeFilter
不支持用于选择字段的点符号引起的。现在,此问题已从Connect FilePulse v1.5.2