问题描述
以恰好一次的方式运行 3 个 Kafka Streams 实例,但在重新启动其中一个流实例时遇到数据丢失(另外 2 个进行重新平衡)。
如果我快速重启实例(在 session.timeout.ms
内),而其他 2 个没有重新平衡,一切都会按预期工作。
- 输入和输出主题由 6 个分区创建。
- 运行 3 个 Kafka 代理。
- 在循环中使用单个 Python 生成器生成数据 (
acks='all'
)。 - 使用配置了
consumer.override.isolation.level=read_committed
的单个 Kafka Connect 将数据输出到 SQL
我期望聚合数据与我的 python 循环的输出具有相同的计数。只要 Kafka Streams 没有进入重新平衡状态,这就能正常工作。
简而言之,流实例可以:
- 收集会话数据,并更新会话状态。
- 会话状态的增量更新然后使用窗口重新分区和求和 聚合。
通过我自己的调试输出,我倾向于认为问题与转移聚合状态有关:
- 作为会话 X 更新的记录 A 将 0 添加到聚合中。
- 聚合的输出现在是 6
- 记录 B 是对会话 X 的更新,正在向聚合中添加 1。
- 聚合的输出现在是 7
- 重新平衡
- 对会话 X(可能是也可能不是重播或记录 A)的更新将 0 添加到聚合中。
- 聚合的输出现在是 6
代码的简化和剥离版本:(不是真正的 Java 开发人员,对于非最佳语法,抱歉)
public static void main(String[] args) throws Exception {
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,1);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,2);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);
final StoreBuilder<KeyValueStore<MediaKey,SessionState>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(SESSION_STATE_STORE),mediaKeySerde,sessionStateSerde
);
builder.addStateStore(storeBuilder);
KStream<String,IncomingData> incomingData = builder.stream(
SESSION_TOPIC,Consumed.with(Serdes.String(),mediaDataSerde));
KGroupedStream<MediaKey,AggregatedData> mediaData = incomingData
.transform(new SessionProcessingSupplier(SESSION_STATE_STORE),SESSION_STATE_STORE)
.selectKey(...)
.groupByKey(...);
KTable<Windowed<MediaKey>,AggregatedData> aggregatedMedia = mediaData
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
new Initializer<AggregatedData>() {...},new Aggregator<MediaKey,AggregatedData,AggregatedData>() {
@Override
public AggregatedData apply(MediaKey key,AggregatedData input,AggregatedData aggregated) {
// ... Add stuff to "aggregated"
return aggregated
}
},Materialized.<MediaKey,WindowStore<Bytes,byte[]>>as("aggregated-media")
.withValueSerde(aggregatedDataSerde)
);
aggregatedMedia.toStream()
.map(new KeyValueMapper<Windowed<MediaKey>,KeyValue<MediaKey,PostgresOutput>>() {
@Override
public KeyValue<MediaKey,PostgresOutput> apply(Windowed<MediaKey> mediaidKey,AggregatedData data) {
// ... Some re-formatting and then
return new KeyValue<>(mediaidKey.key(),output);
}
})
.to(POSTGRES_TOPIC,Produced.with(mediaKeySerde,postgresSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology,props);
// Shutdown hook
}
和:
public class SessionProcessingSupplier implements TransformerSupplier<String,Processing.IncomingData,KeyValue<String,Processing.AggregatedData>> {
@Override
public Transformer<String,Processing.AggregatedData>> get() {
return new Transformer<String,Processing.AggregatedData>>() {
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.stateStore = (KeyValueStore<String,Processing.SessionState>) context.getStateStore(sessionStateStoreName);
}
Override
public KeyValue<String,Processing.AggregatedData> transform(String sessionid,Processing.IncomingData data) {
Processing.SessionState state = this.stateStore.get(sessionid);
// ... Update or create session state
return new KeyValue<String,Processing.AggregatedData>(sessionid,output);
}
};
}
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)