问题描述
对于Kafka Streams,如果将commit.internal.ms配置设置为Long.MAX_VALUE,则可以有效避免Kafka Streams自动提交,但仅在调用较低级别的处理器api context.commit()之后提交,我们才能控制提交或不?如果是,那么在出现问题或消息包含格式错误的情况下,如何从主题中删除这些消息?
资源: https://docs.confluent.io/current/streams/index.html
解决方法
您无法从主题中删除邮件... Kafka主题是仅追加的不可变日志。
还请注意,如果您提交一条消息,则也不会从主题中删除该消息。 Kafka是不是消息队列---提交偏移量仅意味着在日志中跟踪当前进度(即偏移量),但不会删除任何消息。 (您可以将其视为“光标”。)
因此,如果邮件格式错误并且您不想处理该邮件,则可以跳过该邮件。实际上,没有必要为此情况设置较大的提交间隔并进行手动提交。