Kinesis - 使用 Python Kinesis 聚合生成并使用 Spring Cloud Stream

问题描述

由于 KPL 在 Python 中不可用,我尝试在我的 Python 项目中通过使用 Python Kinesis Aggregation 在发送记录之前聚合记录来模仿 KPL 之类的功能到 Kinesis。我的使用者是一个多实例 Java Spring 应用程序,它使用 Spring Cloud Stream 来监听传入的消息,并启用了 KCL/KPL。

我能够验证 Spring 应用程序可以接收和处理来自另一个使用实际 KPL Java 库 的生产者的消息,并且这些消息在实例中均匀分布。但是,当我使用 Python Aggregation 时,只有一个 Spring 应用程序实例获取了一部分消息,并且它实际上能够处理它们,但其他消息消失了。在 Python 方面,我能够验证我的 Kinesis put 请求是否会发送到不同的分片,因此消息应该分布在 Spring 应用程序实例中。

由于 Spring 应用程序能够处理一部分记录,我认为/希望可能存在我没有在 Spring Cloud Stream 端设置的属性。我知道这是生产者/消费者的奇怪组合,但我希望其他人以前遇到过这个问题,他们可以指出我正确的方向。下面是我的 Python 聚合代码以及 Spring Cloud Stream 属性

Python 聚合代码

from aws_kinesis_agg.aggregator import RecordAggregator

def write_records_to_stream(stream_name,records):
    
    record_aggregator = RecordAggregator()
    partition_key = 1

    for record in records:

        aggregated_records = record_aggregator.add_user_record(
            partition_key=str(partition_key),data=record,explicit_hash_key=None)

        if aggregated_records:
            _write_records_to_stream(aggregated_records,stream_name)
            partition_key += 1

    aggregated_records = record_aggregator.clear_and_get()

    if aggregated_records:
        _write_records_to_stream(aggregated_records,stream_name)

def _write_records_to_stream(aggregated_records,stream_name):
    
    partition_key,hash_key,data = aggregated_records.get_contents()    
    kinesis_client.put_record(StreamName=stream_name,Data=data,PartitionKey=partition_key)

Spring Cloud Stream 属性

spring:
  cloud:
    stream:
      bindings:        
        my-stream:
          consumer:
            autoRebalanceEnabled: true            
            partitioned: false
          content-type: application/*+avro
          destination: my-stream-name
          group: my-group
      kinesis:
        binder:
          kpl-kcl-enabled: true          
        bindings:
          my-stream:
            consumer:
              checkpointMode: manual
              recordsLimit: 10
              idleBetweenPolls: 10000
              listenerMode: batch

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)