问题描述
使用org.apache.flume.agent.embedded.EmbeddedAgent
。
这样的配置:
Map<String,String> configurationProperties = ...;
service.configure(configurationProperties);
其中configurationProperties
设置为:
{
"kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000","processor.type": "load_balance","sinks": "kafkaSink1","channel.keep-alive": "0","channel.checkpointDir": "********************************","kafkaSink.kafka.producer.reconnect.backoff.ms": "2000","channel.dataDirs": "********************************","kafkaSink.kafka.producer.retry.backoff.ms": "1000","processor.selector.maxTimeOut": "60000","kafkaSink.kafka.producer.max.request.size": "5485760","kafkaSink1.flumeBatchSize": "1000","kafkaSink.kafka.producer.buffer.memory": "67108864","kafkaSink.kafka.producer.client.id": "********************************","kafkaSink1.useFlumeEventFormat": "true","kafkaSink1.kafka.topic": "********************************","kafkaSink.kafka.producer.batch.size": "8196","channel.kafka.dataDirs": "********************************","kafkaSink1.type": "KAFKA","channel.backupCheckpointDir": "********************************","kafkaSink1.allowTopicOverride": "true","channel.useDualCheckpoints": "true","kafkaSink.kafka.producer.compression.type": "lz4","processor.maxBackoff": "60000","use_dual_channel": "true","channel.capacity": "1000000","channel.byteCapacityBufferPercentage": "50","channel.transactionCapacity": "1000","channel.byteCapacity": "10485760","channel.type": "file","processor.backoff": "true","channel.kafka.checkpointDir": "********************************","channel.kafka.backupCheckpointDir": "********************************","kafkaSink1.kafka.bootstrap.servers": "********************************","kafkaSink.kafka.producer.acks": "-1"
}
在运行时,它抛出以下内容:
java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)
水槽什么也不会开始。
org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52
的代码表明它正在寻找属性sinks
,该属性不为空,因此它不应引发任何此类错误...
有人知道为什么吗?没有任何有关的文档。...
解决方法
万一有人偶然发现这个问题... 在进一步研究之后,我相信Apache Flume项目已经死了,我们将停止使用它。 有关详细信息,请参见以下内容:
-
许多未解决的问题…没有回应…https://cwiki.apache.org/confluence/display/FLUME/Developer+Section
-
大约2年前发布的最新版本(开始时每6个月发布一次)。 https://flume.apache.org/index.html
-
Wiki甚至没有使用最新版本进行更新-超过5年前的最新更新-https://cwiki.apache.org/confluence/display/FLUME/Home
不幸的是,我将不得不以这种方式来结束这个问题。 希望对任何人有帮助。