问题描述
我正在使用带有以下应用程序的带有kafka-binding的spring-cloud-stream进行练习:
spring:
cloud:
function:
deFinition: single;react
stream:
bindings:
single-in-0:
destination: single-in
group: single-in
single-out-0:
destination: single-out
react-in-0:
destination: react-in
group: react-in
react-out-0:
destination: react-out
kafka:
default:
consumer:
enableDlq: true
dlqName: dlq
binder:
brokers: 192.168.153.133:9092
autoAddPartitions: true
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class,args);
}
@Bean
public Function<String,String> single() {
return msg -> {
if ("EXCP".equalsIgnoreCase(msg)) {
throw new RuntimeException("Received bad massage");
}
return "OK: " + msg;
};
}
@Bean
public Function<Flux<String>,Flux<String>> react() {
return fluxmsg -> {
return fluxmsg.map(msg -> {
if ("EXCP".equalsIgnoreCase(msg)) {
throw new RuntimeException("Received bad massage");
}
return "OK: " + msg;
});
};
}
}
如您所见,该应用程序非常简单:如果收到的消息是“ EXCP”,则将引发Exception,否则将发布“ OK”消息。
我不清楚的是,为什么从“ react-in”中读取错误消息时,从该主题读取的每条消息都归入DLQ主题。例如:
- 将“ test-1”写入“ react-in”->在“ react-out”中获得“ OK:test 1”
- 将“ test-2”写入“ react-in”->在“ react-out”中获得“ OK:test 2”
- 将“ EXCP”写入“ react-in”->在“ dlq”中获得“ ECP”
- 将“ test-3”写入“ react-in”->在“ dlq”中获得“ test 3”
我希望最后一条消息将在“ react-out”主题中发布,而不是在“ dlq”主题中发布。 这里的日志:
...
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = earliest
bootstrap.servers = [192.168.153.133:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = react-in
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
Metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
2020-08-13 09:59:10.622 INFO 17688 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-13 09:59:10.622 INFO 17688 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:10.622 INFO 17688 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1597305550622
2020-08-13 09:59:10.623 INFO 17688 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-react-in-4,groupId=react-in] Subscribed to topic(s): react-in
2020-08-13 09:59:10.624 INFO 17688 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-08-13 09:59:10.631 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-single-in-2,groupId=single-in] Successfully joined group with generation 8
2020-08-13 09:59:10.637 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-single-in-2,groupId=single-in] Adding newly assigned partitions: single-in-0
2020-08-13 09:59:10.648 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-single-in-2,groupId=single-in] Setting offset for partition single-in-0 to the committed offset FetchPosition{offset=18,offsetEpoch=Optional.empty,currentleader=leaderAndEpoch{leader=Optional[192.168.153.133:9092 (id: 0 rack: null)],epoch=0}}
2020-08-13 09:59:10.658 INFO 17688 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@de8039f
2020-08-13 09:59:10.668 INFO 17688 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-react-in-4,groupId=react-in] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:10.676 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] discovered group coordinator 192.168.153.133:9092 (id: 2147483647 rack: null)
2020-08-13 09:59:10.677 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.689 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] Join group Failed with org.apache.kafka.common.errors.MemberIdrequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-08-13 09:59:10.689 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] (Re-)joining group
2020-08-13 09:59:10.699 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] Finished assignment for group at generation 4: {consumer-react-in-4-7ca5b03b-bc58-4af0-b4e5-c0666fc2f05a=Assignment(partitions=[react-in-0])}
2020-08-13 09:59:10.723 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] Successfully joined group with generation 4
2020-08-13 09:59:10.723 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] Adding newly assigned partitions: react-in-0
2020-08-13 09:59:10.726 INFO 17688 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-react-in-4,groupId=react-in] Setting offset for partition react-in-0 to the committed offset FetchPosition{offset=51,epoch=0}}
2020-08-13 09:59:10.726 INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : react-in: partitions assigned: [react-in-0]
2020-08-13 09:59:10.728 INFO 17688 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 4.473 seconds (JVM running for 5.039)
2020-08-13 09:59:10.753 INFO 17688 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : single-in: partitions assigned: [single-in-0]
2020-08-13 09:59:23.925 INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.153.133:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-3
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
Metadata.max.age.ms = 300000
Metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-13 09:59:23.932 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-13 09:59:23.932 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:23.932 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1597305563932
2020-08-13 09:59:23.944 INFO 17688 --- [ad | producer-3] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:31.919 INFO 17688 --- [container-0-C-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.react-in-0' has 0 subscriber(s).
2020-08-13 09:59:34.922 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[4],headers={kafka_offset=54,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@60e67748,deliveryAttempt=3,kafka_timestampType=CREATE_TIME,kafka_receivedPartitionId=0,kafka_receivedTopic=react-in,kafka_receivedTimestamp=1597305564033,contentType=application/json,kafka_groupId=react-in}],kafka_groupId=react-in}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.lambda$onMessage$0(retryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.retryingMessageListenerAdapter.onMessage(retryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1878)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1860)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1797)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1737)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1634)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1080)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,kafka_groupId=react-in}]
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 27 more
2020-08-13 09:59:34.925 INFO 17688 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.153.133:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-4
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
Metadata.max.age.ms = 300000
Metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-13 09:59:34.929 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-13 09:59:34.930 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-13 09:59:34.930 INFO 17688 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1597305574929
2020-08-13 09:59:34.943 INFO 17688 --- [ad | producer-4] org.apache.kafka.clients.Metadata : [Producer clientId=producer-4] Cluster ID: UzQ_A2bBRdOuprgdvSEIZg
2020-08-13 09:59:39.770 ERROR 17688 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: dispatcher has no subscribers for channel 'application.react-in-0'.; nested exception is org.springframework.integration.MessagedispatchingException: dispatcher has no subscribers,FailedMessage=Genericmessage [payload=byte[6],headers={kafka_offset=55,kafka_receivedTimestamp=1597305568884,kafka_groupId=react-in}]
at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:139)
at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 27 more
有趣的是,“单一”功能的表现符合我的预期:
- 将“ test-1”写入“单入”->在“单出”中获得“确定:测试1”
- 将“ test-2”写入“单入”->在“单出”中获得“确定:测试2”
- 将“ EXCP”写入“单一”->在“ dlq”中获得“ ECP”
- 将“ test-3”写入“单入”->在“单出”中获得“确定:测试3”
有人可以解释一下为什么在反应堆实现中所有消息都在dql中发布,这意味着“调度程序没有订阅者”错误是什么意思?
谢谢
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)