如何使用Kafka集成在Spring Batch中调用StepExcecutionListener?

问题描述

下面是etl.xml中job的配置

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

:job>

下面是用于向该主题发送消息的代码

ListenableFuture > listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback >(){

@Override
public void onSuccess(SendResult<String,message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“soMetable”,KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(soMetable,KafkaResponse.FAILURE);
}

}

执行kafkaTemplate.send(message)后,将调用侦听器并完成作业。我看到了 在作业完成后调用onSuccess(),onFailure()。 我如何更改job的配置,以便在收到来自kafka主题的确认后调用监听器?

解决方法

您是否想举一些示例代码,说明您建议阻止等待未来。可能会有帮助。

我没有尝试以下方法,但这是一个主意:

@for (var i = 0; i < Model.Addresses.Count; i++)
{
    using (Html.BeginHtmlFieldPrefixScope(string.Format("Addresses[{0}]",i)))
    {
        @Html.Partial(MVC.Shared.Views._Address,Model.Addresses[i])
    }
}

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...