问题描述
我们在应用程序中使用spring-kafka 2.3.0。在以下场景中观察到了一些处理故障
@Service
@EnableScheduling
public class KafkaService {
public void sendToKafkaProducer(String data) {
kafkaTemplate.send(configuration.getProducer().getTopicName(),data);
}
@KafkaListener(id = "consumer_grpA_id",topics = "#{__listener.getEnvironmentConfiguration().getConsumer().getTopicName()}",groupId = "consumer_grpA",autoStartup = "false")
public void onMessage(ConsumerRecord<String,String> data) throws Exception {
passA(data);
}
private void passB(String message) {
//counter to keep track of retry attempts
if (counter.containsKey(message.getEventID())) {
//RETRY_COUNT = 5
if (counter.get(message.getEventID()) < RETRY_COUNT) {
retryAgain(message);
}
} else {
firstRetryPass(message);
}
}
private void retryAgain(String message) {
counter.put(message.getEventID(),counter.get(message.getEventID()) + 1);
try {
registry.stop(); //pause the listener
} catch (Exception e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
}
private void firstRetryPass(String message) {
// First Time Entry for count and time
counter.put(message.getEventID(),1);
try {
registry.stop();//pause the listener
} catch (Exception e) {
// Todo Auto-generated catch block
e.printstacktrace();
}
}
private void passA(String message) {
try {
passtoTarget(message); //Call target processor
LOGGER.info("Message Processed Successfully to the target");
} catch (Exception e) {
targetUnavailable= true;
passB(message);
}
}
private void passtoTarget(String message){
//processor logic,if the target is not available,retry after 15 mins,call passB func
}
@Scheduled(cron = "0 0/15 * 1/1 * ?")
public void scheduledMethod() {
try {
if (targetUnavailable) {
registry.start();
firstTimeStart = false;
}
LOGGER.info(">>>Scheduler Running ?>>>" + registry.isRunning());
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
- 在处理间隙之后收到第一条消息后,消费者不会收到第一条消息。随后的消息将被处理。
- 由于我们无法直接访问Kafka主题,因此我们无法识别出没有从消费者那里得到的过程。
- 我们如何跟踪未发生的事件,为什么会这样?
- 我们还配置了一个调度程序,其任务是保持Kafka的注册表运行。那么,当我们已经配置了侦听器时,是否需要此调度程序?
- 如果我们保持侦听器运行,则内存和cpu利用率指标是什么?这是我们在目标关闭时使用Kafka注册表显式停止侦听器的原因之一。因此需要验证这种方法是否可持续。我的直觉是,这与监听器的基本工作背道而驰,因为它的主要工作是继续监听新事件,而不管目标状态如何 已编辑 *
解决方法
-
除非使用
stop(Runnable)
,否则不应该在侦听器线程上停止注册表-否则,由于容器等待侦听器退出,因此将出现死锁和延迟。 -
在处理完由上次轮询获取的所有剩余记录之前(除非您设置了
max.poll.records=1
。)(通过注册表)停止容器实际上不会生效。 -
当侦听器正常退出时,将提交记录的偏移量,以使记录不会在下次启动时重新分发。
您可以在此用例中使用ContainerStoppingErrorHandler
。参见here。
引发异常,错误处理程序将为您停止容器。
但这会在第一次尝试时停止容器。
如果要重试,请使用SeekToCurrentErrorHandler
并在重试用尽后从恢复器中调用ContainerStoppingErrorHandler
。