Apache Camel 2.18.0 Consumer不使用

问题描述

我们正在使用Apache骆驼到2.17。*版本,以便利用maxPollRecords参数,我正在尝试升级到2.18.0。升级到2.18.0之后,经纪人似乎不再认可该消费者。以下是我尝试创建的样本使用者。我可以生成从cli到主题的消息,如果在cli中创建使用者,则可以看到在cli中创建的使用者使用了消息,但没有通过apache骆驼创建的使用者。

也可以使用使用者组describe cli命令,如果仅运行apache camel使用者实例,我可以将使用者ID视为空白。当我使用2.17.5运行时,代理用来识别该分区并将其分配给该分区。我找不到例子 请帮忙。

package com.test;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelConsumer {

public static void main(String argv[]){
CamelContext camelContext = new DefaultCamelContext();

// Add route to send messages to Kafka

try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties",PropertiesComponent.class);
pc.setLocation("classpath:application.properties");

System.out.println("About to start route: Kafka Server -> Log ");


from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}"
+ "&groupId={{consumer.group}}").routeId("FromKafka")
.process(new Processor() {

@Override
public void process(Exchange exchange) throws Exception {

Message message = exchange.getIn();
Object data = message.getBody();

System.out.println(data);
}
});
}
});

camelContext.start();

Thread.sleep(5 * 60 * 1000);

camelContext.stop();
} catch (Exception e) {
e.printstacktrace();
}
}
}

我也没有任何例外。我找不到与此相关的任何文档。请帮忙。

consumer.topic = test kafka.host = localhost kafka.port = 9092 Consumer.maxPollRecords = 1 Consumer.consumersCount = 1 Consumer.group = test

解决方法

我可以在以下存储库中从2.19 *中找到有效的示例代码。 https://github.com/Talend/apache-camel/branches(分支分支)

https://github.com/apache/camel(骆驼的实际分支)

最终它与2.21.5版本一起使用,我不得不将apache kafka maven版本从0.9 *升级到1.0.0