异步骆驼组件-立即调用doStop

问题描述

我正在尝试创建一个使用来自外部服务的API的骆驼组件。

我的路线如下

from("myComponent:entity?from=&to=")
.to("seda:one")

from("seda:one")
.aggregate(constant(true),new GroupedBodyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.process( new Processor1() )
to("seda:two")

.
.
.


from("seda:five")
.to("myComponent2:entity")

我实现了组件消费者,如下所示

public class MyComponentConsumer extends DefaultConsumer {

    public MyComponentConsumer(MyComponentEndpoint endpoint,Processor processor) {
        super(endpoint,processor);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        flag = true;
        while ( flag ) {
            //external API call
            Resource resource = getNextResource();
            if ( resource.next() == null ) {
                flag = false;
            }
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(resource.toString());
            getAsyncProcessor().process(
                            ex
                            donesync -> {
                                LOG.info("Message processed");
                            }
                    );
        }
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        System.out.println("stop ---- ");
    }
}

一切正常,数据在路由中传播。我唯一的问题是,在整个过程完成之前,数据不会传播到下一部分。接下来的部分异步运行。

我查看了StreamConsumer的示例,并尝试使用runnable和executorService将其实现到我的代码中。但是,如果我这样做,消费者就会一开始就停止。

我将代码更改为

public class MyComponentConsumer extends DefaultConsumer implements Runnable 

添加

private ExecutorService executor;
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,"myComponent");
executor.execute(this);

,然后将我的逻辑移到run()方法中。但是,使用者线程一开始就结束。并且异步处理器无法正确传输数据。

还有其他方法可以实现我需要的功能吗?还是我在这里的某个地方弄错了。任何帮助将不胜感激。

解决方法

您正在使用哪个版本的骆驼?

在骆驼3.x CAMEL-12765中已解决了在骆驼2.x中管理消费者状态的问题,这可能导致您在此处描述该问题。

如果您使用的是骆驼2.x,请尝试使用newScheduledThreadPool而不是newSingleThreadExecutor。 也是executor.schedule(this,5L,TimeUnit.SECONDS),而不是executor.execute(this)。

执行器的启动延迟可能有助于避免您遇到的问题。