Spring Cloud流所有发送到DLQ的消息

问题描述

我正在使用带有以下应用程序的带有kafka-binding的spring-cloud-stream进行练习:

spring:
  cloud:
    function:
      deFinition: single;react
    stream:
      bindings:
        single-in-0:
          destination: single-in
          group: single-in
        single-out-0:
          destination: single-out
        react-in-0:
          destination: react-in
          group: react-in
        react-out-0:
          destination: react-out
      kafka:
        default:
          consumer:
            enableDlq: true
            dlqName: dlq
        binder:
          brokers: 192.168.153.133:9092
          autoAddPartitions: true
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class,args);
    }

    
    @Bean
    public Function<String,String> single() {
        return msg -> {
            if ("EXCP".equalsIgnoreCase(msg)) {
                throw new RuntimeException("Received bad massage");
            }
            return "OK: " + msg;
        };
    }
    
    @Bean
    public Function<Flux<String>,Flux<String>> react() {
        return fluxmsg -> {
            return fluxmsg.map(msg -> {
                if ("EXCP".equalsIgnoreCase(msg)) {
                    throw new RuntimeException("Received bad massage");
                }
                return "OK: " + msg;
            });
        };
    }
}

如您所见,该应用程序非常简单:如果收到的消息是“ EXCP”,则将引发Exception,否则将发布“ OK”消息。

我不清楚的是,为什么从“ react-in”中读取错误消息时,从该主题读取的每条消息都归入DLQ主题。例如:

  1. 将“ test-1”写入“ react-in”->在“ react-out”中获得“ OK:test 1”
  2. 将“ test-2”写入“ react-in”->在“ react-out”中获得“ OK:test 2”
  3. 将“ EXCP”写入“ react-in”->在“ dlq”中获得“ ECP”
  4. 将“ test-3”写入“ react-in”->在“ dlq”中获得“ test 3”

我希望最后一条消息将在“ react-out”主题中发布,而不是在“ dlq”主题中发布。 这里的日志:

...
ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 100
    auto.offset.reset = earliest
    bootstrap.servers = [192.168.153.133:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = react-in
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    Metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:10.622  INFO 17688 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305550622
2020-08-13 09:59:10.623  INFO 17688 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-react-in-4,groupId=react-in] Subscribed to topic(s): react-in
2020-08-13 09:59:10.624  INFO 17688 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-08-13 09:59:10.631  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-single-in-2,groupId=single-in] Successfully joined group with generation 8
2020-08-13 09:59:10.637  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2,groupId=single-in] Adding newly assigned partitions: single-in-0
2020-08-13 09:59:10.648  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-single-in-2,groupId=single-in] Setting offset for partition single-in-0 to the committed offset FetchPosition{offset=18,offsetEpoch=Optional.empty,currentleader=leaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)],epoch=0}}
2020-08-13 09:59:10.658  INFO 17688 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@de8039f
2020-08-13 09:59:10.668  INFO 17688 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-react-in-4,groupId=react-in] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:10.676  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] discovered group coordinator 192.168.153.133:9092 (id: 2147483647 rack: null)
2020-08-13 09:59:10.677  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] Join group Failed with org.apache.kafka.common.errors.MemberIdrequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-13 09:59:10.689  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.699  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] Finished assignment for group at generation 4: {consumer-react-in-4-7ca5b03b-bc58-4af0-b4e5-c0666fc2f05a=Assignment(partitions=[react-in-0])}
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] Successfully joined group with generation 4
2020-08-13 09:59:10.723  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] Adding newly assigned partitions: react-in-0
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-react-in-4,groupId=react-in] Setting offset for partition react-in-0 to the committed offset FetchPosition{offset=51,epoch=0}}
2020-08-13 09:59:10.726  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : react-in: partitions assigned: [react-in-0]
2020-08-13 09:59:10.728  INFO 17688 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 4.473 seconds (JVM running for 5.039)
2020-08-13 09:59:10.753  INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : single-in: partitions assigned: [single-in-0]
2020-08-13 09:59:23.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-3
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    Metadata.max.age.ms = 300000
    Metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:23.932  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305563932
2020-08-13 09:59:23.944  INFO 17688 --- [ad | producer-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:31.919  INFO 17688 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.react-in-0' has 0 subscriber(s).
2020-08-13 09:59:34.922 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[4],headers={kafka_offset=54,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748,deliveryAttempt=3,kafka_timestampType=CREATE_TIME,kafka_receivedPartitionId=0,kafka_receivedTopic=react-in,kafka_receivedTimestamp=1597305564033,contentType=application/json,kafka_groupId=react-in}],kafka_groupId=react-in}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

2020-08-13 09:59:34.925  INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [192.168.153.133:9092]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-4
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    Metadata.max.age.ms = 300000
    Metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2020-08-13 09:59:34.929  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:34.930  INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597305574929
2020-08-13 09:59:34.943  INFO 17688 --- [ad | producer-4] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-4] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:39.770 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[6],headers={kafka_offset=55,kafka_receivedTimestamp=1597305568884,kafka_groupId=react-in}]
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 27 more

有趣的是,“单一”功能的表现符合我的预期:

  1. 将“ test-1”写入“单入”->在“单出”中获得“确定:测试1”
  2. 将“ test-2”写入“单入”->在“单出”中获得“确定:测试2”
  3. 将“ EXCP”写入“单一”->在“ dlq”中获得“ ECP”
  4. 将“ test-3”写入“单入”->在“单出”中获得“确定:测试3”

有人可以解释一下为什么在反应堆实现中所有消息都在dql中发布,这意味着“调度程序没有订阅者”错误是什么意思?

谢谢

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...