问题描述
我正在尝试创建一个使用来自外部服务的API的骆驼组件。
我的路线如下
from("myComponent:entity?from=&to=")
.to("seda:one")
from("seda:one")
.aggregate(constant(true),new GroupedBodyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.process( new Processor1() )
to("seda:two")
.
.
.
from("seda:five")
.to("myComponent2:entity")
我实现了组件消费者,如下所示
public class MyComponentConsumer extends DefaultConsumer {
public MyComponentConsumer(MyComponentEndpoint endpoint,Processor processor) {
super(endpoint,processor);
}
@Override
protected void doStart() throws Exception {
super.doStart();
flag = true;
while ( flag ) {
//external API call
Resource resource = getNextResource();
if ( resource.next() == null ) {
flag = false;
}
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
ex.getIn().setBody(resource.toString());
getAsyncProcessor().process(
ex
donesync -> {
LOG.info("Message processed");
}
);
}
}
@Override
protected void doStop() throws Exception {
super.doStop();
System.out.println("stop ---- ");
}
}
一切正常,数据在路由中传播。我唯一的问题是,在整个过程完成之前,数据不会传播到下一部分。接下来的部分异步运行。
我查看了StreamConsumer的示例,并尝试使用runnable和executorService将其实现到我的代码中。但是,如果我这样做,消费者就会一开始就停止。
我将代码更改为
public class MyComponentConsumer extends DefaultConsumer implements Runnable
并添加
private ExecutorService executor;
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,"myComponent");
executor.execute(this);
,然后将我的逻辑移到run()方法中。但是,使用者线程一开始就结束。并且异步处理器无法正确传输数据。
还有其他方法可以实现我需要的功能吗?还是我在这里的某个地方弄错了。任何帮助将不胜感激。
解决方法
您正在使用哪个版本的骆驼?
在骆驼3.x CAMEL-12765中已解决了在骆驼2.x中管理消费者状态的问题,这可能导致您在此处描述该问题。
如果您使用的是骆驼2.x,请尝试使用newScheduledThreadPool而不是newSingleThreadExecutor。 也是executor.schedule(this,5L,TimeUnit.SECONDS),而不是executor.execute(this)。
执行器的启动延迟可能有助于避免您遇到的问题。