问题描述
我有一个使用 Spring Cloud Stream Kafka 的应用程序。 对于用户定义的主题,我可以通过提供我在下面提到的配置来从指定的主题中删除记录。但此配置不适用于 DLQ 主题。
例如在下面的配置中,我在活页夹级别配置了保留时间。所以我在绑定级别下定义的生产者主题(学生主题)正确配置,我可以检查当主题日志超过指定的保留字节(300000000)时记录是否被删除。
但是活页夹级别的保留时间不起作用 DLQ 主题(person-topic-error-dlq)。除了保留时间之外,是否有任何不同的配置来清除来自 DLQ 主题的记录。
我该怎么做?
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json
解决方法
您只是为生产者绑定设置(默认)属性。
也就是说,这仍然对我不起作用:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
(这些属性甚至不适用于主要主题)。
看起来默认的 kafka 消费者绑定属性有问题。
这对我有用;这些属性适用于主要和死信主题:
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000