Kafka Streams EOS 模式 - 通知关闭

问题描述

我有一个 Kafka Streams 应用程序,即使在调试级别,它也没有任何适当的日志记录就关闭了 -

2020-12-18 14:25:36:875 +0000 [Thread-7] INFO  o.apache.kafka.streams.KafkaStreams:? - stream-client [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c] State transition from REBALANCING to PENDING_SHUTDOWN
    2020-12-18 14:25:36:973 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] Informed to shut down
    2020-12-18 14:25:36:974 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
    2020-12-18 14:25:36:974 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer,groupId=XXXXXX-estestes5-null] Updating last seen epoch from 0 to 0 for partition input-event-stream-client-pandprat-estestes5-0
    2020-12-18 14:25:37:075 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer,groupId=XXXXXX-estestes5-null] Updated cluster Metadata updateVersion 3 to MetadataCache{cluster=Cluster(id = ibD7yxLZQQSg24kQTlFnZA,nodes = [b-9.XXXXX.ap-southeast-1.amazonaws.com:9092 (id: 9 rack: apse1-az3),b-7.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 7 rack: apse1-az1),b-8.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 8 rack: apse1-az2),b-5.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 5 rack: apse1-az3),b-4.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 4 rack: apse1-az2),b-6.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 6 rack: apse1-az1),b-1.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 1 rack: apse1-az3),b-3.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2),b-2.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 2 rack: apse1-az1)],partitions = [Partition(topic = input-event-stream-client-pandprat-estestes5,partition = 0,leader = 9,replicas = [9,2,3],isr = [9,offlineReplicas = [])],controller = b-3.XXXXXX-msk-temp.k1lph1.c2.kafka.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2))}
    2020-12-18 14:25:37:172 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer,groupId=XXXXXX-estestes5-null] Revoking prevIoUsly assigned partitions []
    2020-12-18 14:25:37:172 +0000 [kafka-coordinator-heartbeat-thread | XXXXXX-estestes5-null] DEBUG o.a.k.c.c.i.AbstractCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer,groupId=XXXXXX-estestes5-null] Heartbeat thread started

Kafka 版本 - 2.3.1 代理版本 - 2.2.1 没有抛出异常。 还可以看到类似的场景,其中应用程序也从 RUNNING 移动到 PENDING_SHUTDOWN。请参阅下面的日志 -

Jan 7,2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Informed to shut down

    Jan 7,2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN

    Jan 7,2021 @ 15:03:06.252  2021-01-07 09:33:06:252 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING

    Jan 7,2021 @ 15:03:06.157  2021-01-07 09:33:06:157 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  c.a.t.e.EventSequenceProcessor:? - EventSequencer Processor getting initialized with bufferFlushInterval : 100,maxBufferSize : 10000,useExternalKNowledgeTime : true,forwardingLimit: 6000,forwardingIntervalInMillis: 6000

    Jan 7,2021 @ 15:03:06.155  2021-01-07 09:33:06:154 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to running

    Jan 7,2021 @ 15:03:05.183  2021-01-07 09:33:05:183 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to restoring

    Jan 7,2021 @ 15:03:05.180  2021-01-07 09:33:05:179 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Registering state store event-seq-state-store to its state manager

    Jan 7,2021 @ 15:03:05.168  2021-01-07 09:33:05:167 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Initializing stream tasks [0_0]

    Jan 7,2021 @ 15:03:05.163  2021-01-07 09:33:05:162 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition assignment took 604 ms.

    Jan 7,2021 @ 15:03:05.163      current active tasks: [0_0]

    Jan 7,2021 @ 15:03:05.163      prevIoUs active tasks: []

    Jan 7,2021 @ 15:03:05.163      current standby tasks: []

    Jan 7,2021 @ 15:03:05.163  
    Jan 7,2021 @ 15:03:04.953  2021-01-07 09:33:04:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Register global stores []

    Jan 7,2021 @ 15:03:04.653  2021-01-07 09:33:04:652 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Created state store manager for task 0_0 with the acquired state dir lock

    Jan 7,2021 @ 15:03:04.653  2021-01-07 09:33:04:653 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating producer client for task 0_0

    Jan 7,2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state PARTITIONS_REVOKED: partitions [input-XXXXXXXX-tf-1test0107-0] assigned at the end of consumer rebalance.

    Jan 7,2021 @ 15:03:04.559      current suspended active tasks: []

    Jan 7,2021 @ 15:03:04.559      current suspended standby tasks: []

    Jan 7,2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

    Jan 7,2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Adding assigned tasks as active: {0_0=[input-XXXXXXXX-tf-1test0107-0]}

    Jan 7,2021 @ 15:03:04.559  
    Jan 7,2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating tasks based on assignment.

    Jan 7,2021 @ 15:03:04.386  2021-01-07 09:33:04:386 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigned tasks to clients as {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevstandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.

    Jan 7,2021 @ 15:03:04.385  2021-01-07 09:33:04:385 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigning tasks [0_0] to clients {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevstandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]} with number of replicas 0

    Jan 7,2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog,topicConfigs={}),numPartitions=1)} in partition assignor.

    Jan 7,2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created state changelog topics [InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog,numPartitions=1)] from the parsed topology.

    Jan 7,2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog is unkNown or not found,hence not existed yet.

    Jan 7,2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Going to create topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog with 1 partitions and config {cleanup.policy=compact}.

    Jan 7,2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog,2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Trying to check if topics [XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog] have been created with expected number of partitions.

    Jan 7,2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {} in partition assignor.

    Jan 7,2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {} in partition assignor.

    Jan 7,2021 @ 15:03:03.955  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created repartition topics [] from the parsed topology.

    Jan 7,2021 @ 15:03:03.953  2021-01-07 09:33:03:953 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Constructed client Metadata {60f8a4be-b576-4ed0-9615-b91021cd76e0=ClientMetadata{hostInfo=null,consumers=[XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer-e7ce11d1-7b5a-478f-8aaa-1e46df67bbf3],state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevstandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions.

    Jan 7,2021 @ 15:03:03.952  2021-01-07 09:33:03:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Current minimum supported version remains at 4,last seen supported version was 4

    Jan 7,2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] found [input-XXXXXXXX-tf-1test0107] topics possibly matching regex

    Jan 7,2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] updating builder with SubscriptionUpdates{updatedTopicSubscriptions=[input-XXXXXXXX-tf-1test0107]} topic(s) with possible matching regex subscription(s)

    Jan 7,2021 @ 15:03:00.353      current assigned active tasks: []

    Jan 7,2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from STARTING to PARTITIONS_REVOKED

    Jan 7,2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Suspending all active tasks [] and standby tasks []

    Jan 7,2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state STARTING: partitions [] revoked at the beginning of consumer rebalance.

    Jan 7,2021 @ 15:03:00.353      suspended standby tasks: []

    Jan 7,2021 @ 15:03:00.353      current assigned standby tasks: []

    Jan 7,2021 @ 15:03:00.353  
    Jan 7,2021 @ 15:03:00.353  2021-01-07 09:33:00:353 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition revocation took 1 ms.

    Jan 7,2021 @ 15:03:00.353      suspended active tasks: []

    Jan 7,2021 @ 15:02:58.752  Event Sequencer Server started,listening on 2301

    Jan 7,2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from CREATED to STARTING

    Jan 7,2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Starting

    Jan 7,2021 @ 15:02:57.151  2021-01-07 09:32:57:151 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - StreamThread Metadata : ThreadMetadata{threadName=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1,threadState=STARTING,activeTasks=[],standbyTasks=[],consumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer,restoreConsumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-restore-consumer,producerClientIds=[],adminClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-admin} 

    Jan 7,2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retry.backoff.ms' was supplied but isn't a kNown config.

    Jan 7,2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retries' was supplied but isn't a kNown config.

    Jan 7,2021 @ 15:02:56.760  2021-01-07 09:32:56:760 +0000 [main] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Configs:

    Jan 7,2021 @ 15:02:56.760  
    Jan 7,2021 @ 15:02:56.753  2021-01-07 09:32:56:752 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating consumer client

    Jan 7,2021 @ 15:02:55.857  2021-01-07 09:32:55:856 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating restore consumer client

    Jan 7,2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - Initializing Ingestion Topic Consumer

    Jan 7,2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  com.arcesium.trinity.EventSequencer:? - Initializing Ingestion Topic Consumer

    Jan 7,2021 @ 15:02:53.551      Source: input-topic (topics: [input-XXXXXXXX-tf-1test0107])

    Jan 7,2021 @ 15:02:53.551      Processor: event-sequencer (stores: [event-seq-state-store])

    Jan 7,2021 @ 15:02:53.551      Sink: output-event-topic (topic: output-XXXXXXXX-tf-1test0107)

    Jan 7,2021 @ 15:02:53.551        <-- event-sequencer

    Jan 7,2021 @ 15:02:53.551  
    Jan 7,2021 @ 15:02:53.551        <-- input-topic

    Jan 7,2021 @ 15:02:53.551  2021-01-07 09:32:53:460 +0000 [main] INFO  c.a.t.config.EventSequencerConfig:? - Topology initialized: Topologies:

    Jan 7,2021 @ 15:02:53.551     Sub-topology: 0

    Jan 7,2021 @ 15:02:53.551        --> event-sequencer

    Jan 7,2021 @ 15:02:53.551        --> output-event-topic

    Jan 7,2021 @ 15:02:30.854  Listening for transport dt_socket at address: 2311

我还看到每当触发重新平衡时都会发生关机。请参阅下面的日志 -

Jan 7,2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Informed to shut down

    Jan 7,2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN

    Jan 7,2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Committing

    Jan 7,2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.RecordCollectorImpl:? - task [0_0] Flushing producer

    Jan 7,2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Flushing all stores registered in the state manager

    Jan 7,2021 @ 08:42:50.008      current assigned active tasks: [0_0]

    Jan 7,2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Suspending all active tasks [0_0] and standby tasks []

    Jan 7,2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Suspending

    Jan 7,2021 @ 08:42:50.008      current assigned standby tasks: []

    Jan 7,2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED

    Jan 7,2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] at state RUNNING: partitions [input-XXXXXXX-tzobnwpj-0] revoked at the beginning of consumer rebalance.

    Jan 7,2021 @ 08:42:50.008  
    Jan 7,2021 @ 08:42:49.842  2021-01-07 03:12:49:842 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer,groupId=XXXXXXX-tzobnwpj-null] Received unkNown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

    Jan 7,2021 @ 08:42:49.652  2021-01-07 03:12:49:652 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer,2021 @ 08:42:49.651  2021-01-07 03:12:49:650 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer,2021 @ 08:42:49.649  2021-01-07 03:12:49:649 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer,groupId=XXXXXXX-tzobnwpj-null] Received unkNown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

有人知道为什么会发生这种情况吗?

解决方法

日志行 "Informed to shut down" 表明调用了 shutdownStreamThread 方法。 这只能从 2 个地方调用:-

One - KafkaStream close 方法 - 用于实际完全关闭 Kafka 流(最终关闭所有 StreamThreads) 但是您的调试日志并不表明完整的 Kafka 流正在关闭。如果是这种情况,您的日志中会出现以下情况

 log.debug("Stopping Streams client with timeoutMillis = {} ms.",timeoutMs);

两个 - RebalanceListener - onPartitionsAssigned 方法

if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
                log.error("Received error code {} - shutdown",streamThread.assignmentErrorCode.get());
                streamThread.shutdown();
                return;
            } 

这可能意味着由于 INCOMPLETE_SOURCE_TOPIC_METADATA,您的 StreamThread 正在接收关闭请求。 这也可能是暂时性问题,也可能是由于元数据不完整(例如主题名称不存在或拼写错误等)而导致的永久性故障