Kafka使用者,在处理某些消息时失败 替代

问题描述

我有一个带有单个kafka使用者的spring boot应用程序,用于从某个主题获取消息。 但是有时在处理邮件时会发生错误。

我想继续像往常一样接收以下消息,同时又不能丢失该消息并接收它,例如,下一次在修复该问题后与消费者重新启动该服务。

可以这样做吗?

我知道我需要手动禁用自动提交并成功提交成功的消息,但是,在这种情况下,如果我不为此异常情况抛出任何异常并手动提交每条下一条成功的消息,那么我将丢失以前失败的一个,对吧?

解决方法

如果我正确理解了您的问题,那么您的假设是,异常是由于代码中的问题而发生的,而不是在从主题中读取消息时发生的。在这种情况下,无需重试或采取其他措施即可解决您的问题。

我们通常要做的是捕获异常并将其发送到另一个Kafka主题。理想情况下,您还将添加一些有关异常发生的原因或发生在哪个代码部分的详细信息。修复应用程序中的错误后,您可以使用该其他主题的消息。

我知道我需要手动禁用自动提交并成功提交成功的消息,但是,在这种情况下,如果我不为此异常情况抛出任何异常并手动提交每条下一条成功的消息,那么我将丢失以前失败的一个,对吧?

是的,您的理解是正确的。更准确地说,您不会“松开”消息,但是一旦ConsumerGroup提交了较高的偏移量,它将在没有任何手动修改的情况下永远不会尝试再次读取较低的偏移量。

替代

如果您只希望在极少数情况下会引发异常,而忽略它,则可以始终在纯Kafka中使用consumer.seek()方法

public void seek(TopicPartition partition,long offset)

读取主题中的特定偏移量。

,

是的,您必须手动提交它们。您重试特定消息2-3次。如果重试后失败,则可以将这些消息移至另一个主题,并在修复导致其失败的原因时使用这些消息。这不会阻塞您的队列,也不会丢失消息。

,

我想继续照常接收以下消息 同时不能丢失该消息并接收它,因为 例如,下次与使用者重新启动服务时 修复后。

可以这样做吗?

  1. 您不需要执行手动提交,而是可以选择实现一种机制,通过将事件发布到另一个队列中并延迟使用该事件来进行重试。 =====> Amazon SQS具有delay Queue,但不幸的是,kafka中没有这样的东西,您必须自己编写实现。

    参考文章:

    Article 1

    Article 2

  2. 如果要重试消息处理,则消息的顺序可以根据您的实现而更改。请记住这一点。

  3. 请记住,如果消息处理时间超过max.poll.interval,kafka确实认为使用者死亡。阅读this

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...