Kafka Streams 恰好一次重新平衡聚合状态数据丢失

问题描述

以恰好一次的方式运行 3 个 Kafka Streams 实例,但在重新启动其中一个流实例时遇到数据丢失(另外 2 个进行重新平衡)。 如果我快速重启实例(在 session.timeout.ms 内),而其他 2 个没有重新平衡,一切都会按预期工作。

  • 输入和输出主题由 6 个分区创建。
  • 运行 3 个 Kafka 代理。
  • 在循环中使用单个 Python 生成器生成数据 (acks='all')。
  • 使用配置了 consumer.override.isolation.level=read_committed 的单个 Kafka Connect 将数据输出到 SQL

我期望聚合数据与我的 python 循环的输出具有相同的计数。只要 Kafka Streams 没有进入重新平衡状态,这就能正常工作。

简而言之,流实例可以:

  1. 收集会话数据,并更新会话状态。
  2. 会话状态的增量更新然后使用窗口重新分区和求和 聚合。

通过我自己的调试输出,我倾向于认为问题与转移聚合状态有关:

  1. 作为会话 X 更新的记录 A 将 0 添加到聚合中。
  2. 聚合的输出现在是 6
  3. 记录 B 是对会话 X 的更新,正在向聚合中添加 1。
  4. 聚合的输出现在是 7
  5. 重新平衡
  6. 对会话 X(可能是也可能不是重播或记录 A)的更新将 0 添加到聚合中。
  7. 聚合的输出现在是 6

代码的简化和剥离版本:(不是真正的 Java 开发人员,对于非最佳语法,抱歉)

public static void main(String[] args) throws Exception {
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,1);
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,2);
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);

    final StoreBuilder<KeyValueStore<MediaKey,SessionState>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(SESSION_STATE_STORE),mediaKeySerde,sessionStateSerde
    );
    builder.addStateStore(storeBuilder);

    KStream<String,IncomingData> incomingData = builder.stream(
            SESSION_TOPIC,Consumed.with(Serdes.String(),mediaDataSerde));
    KGroupedStream<MediaKey,AggregatedData> mediaData = incomingData
                .transform(new SessionProcessingSupplier(SESSION_STATE_STORE),SESSION_STATE_STORE)
                .selectKey(...)
                .groupByKey(...);

    KTable<Windowed<MediaKey>,AggregatedData> aggregatedMedia = mediaData
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                .aggregate(
                        new Initializer<AggregatedData>() {...},new Aggregator<MediaKey,AggregatedData,AggregatedData>() {
                            @Override
                            public AggregatedData apply(MediaKey key,AggregatedData input,AggregatedData aggregated) {
                                // ... Add stuff to "aggregated"
                                return aggregated
                            }
                        },Materialized.<MediaKey,WindowStore<Bytes,byte[]>>as("aggregated-media")
                                .withValueSerde(aggregatedDataSerde)
               );

    aggregatedMedia.toStream()
            .map(new KeyValueMapper<Windowed<MediaKey>,KeyValue<MediaKey,PostgresOutput>>() {
                @Override
                public KeyValue<MediaKey,PostgresOutput> apply(Windowed<MediaKey> mediaidKey,AggregatedData data) {
                        // ... Some re-formatting and then
                        return new KeyValue<>(mediaidKey.key(),output);
                }
            })
            .to(POSTGRES_TOPIC,Produced.with(mediaKeySerde,postgresSerde));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology,props);

    // Shutdown hook
}

和:

public class SessionProcessingSupplier implements TransformerSupplier<String,Processing.IncomingData,KeyValue<String,Processing.AggregatedData>> {
    @Override
    public Transformer<String,Processing.AggregatedData>> get() {
        return new Transformer<String,Processing.AggregatedData>>() {
            @Override
            public void init(ProcessorContext processorContext) {
                this.context = processorContext;
                this.stateStore = (KeyValueStore<String,Processing.SessionState>) context.getStateStore(sessionStateStoreName);
            }

            Override
            public KeyValue<String,Processing.AggregatedData> transform(String sessionid,Processing.IncomingData data) {
                Processing.SessionState state = this.stateStore.get(sessionid);
                // ... Update or create session state
                return new KeyValue<String,Processing.AggregatedData>(sessionid,output);
            }
        };
    }
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...