当我们将 Flink 应用程序部署到 Kinesis Data Analytics 时,不会触发窗口化 本地AWS子任务标签水印标签子任务标签水印

问题描述

我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。

使用过的技术

本地

  • 来源:Kafka 2.7
    • 1 个经纪人
    • 1 个主题,分区为 1,复制因子为 1
  • 处理:Flink 1.12.1
  • 接收器:Managed ElasticSearch Service 7.9.1(与 AWS 相同的实例)

AWS

  • 来源:亚马逊 MSK Kafka 2.8
    • 3 个经纪人(但我们正在连接到一个
    • 1 个主题,分区为 1,复制因子为 3
  • 处理:Amazon KDA Flink 1.11.1
    • 并行度:2
    • 每个 KPU 的并行度:2
  • 接收器:托管 ElasticSearch 服务 7.9.1

应用逻辑

  1. FlinkKafkaConsumer主题中读取 json 格式的消息
  2. json 映射到域对象,称为 Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"broKER1_ADDRESS.amazonaws.com:9092");
    kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONfig,"flink_consumer");

    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC",new SimpleStringSchema(),kafkaProperties);

    consumer.setStartFromEarliest(); //Just for repeatable testing

    return environment
            .addSource(consumer)
            .map(new MapJsonToTelemetry());
}
  1. 为 EventTimeStamp 选择遥测的时间戳。
    3.1.随着forMonotonousTimeStamps
  2. 遥测的 StateIso 用于 keyBy
    4.1.美国州的两个字母的iso代码
  3. 应用 5 秒翻滚窗口策略
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
    WatermarkStrategy<Telemetry> wmStrategy =
            WatermarkStrategy
                    .<Telemetry>forMonotonousTimestamps()
                    .withTimestampAssigner((event,timestamp) -> event.TimeStamp);

    return telemetries
            .assignTimestampsAndWatermarks(wmStrategy)
            .keyBy(t -> t.StateIso)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new WindowCountFunction());
}
  1. 调用自定义 ProcessWindowFunction 来执行一些基本聚合。
    6.1.我们计算单个 StateAggregatedTelemetry
  2. ElasticSearch 被配置为接收器。
    7.1. StateAggregatedTelemetry 数据映射到 HashMap 并推送到 source
    7.2.所有 setBulkFlushXYZ 方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));

    ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
            httpHosts,(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element,ctx,indexer) -> {
                Map<String,Object> record = new HashMap<>();

                record.put("stateIso",element.StateIso);
                record.put("healthy",element.Flawless);
                record.put("unhealthy",element.Faulty);
                ...

                LOG.info("Telemetry has been added to the buffer");
                indexer.add(Requests.indexRequest()
                        .index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
                        .source(record,XContentType.JSON));
            }
    );

    //Using low values to make sure that the Flush will happen
    esSinkBuilder.setBulkFlushMaxActions(25);
    esSinkBuilder.setBulkFlushInterval(1000);
    esSinkBuilder.setBulkFlushMaxSizeMb(1);
    esSinkBuilder.setBulkFlushBackoff(true);
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {});

    LOG.info("Sink has been attached to the DataStream");
    telemetries.addSink(esSinkBuilder.build());
}

排除的东西

  • 我们设法将 Kafka、KDA 和 ElasticSearch 放在相同的 VPC 和相同的子网下,以避免需要对每个请求进行签名
  • 从日志中我们可以看到Flink可以到达ES集群。
    请求
{
    "locationinformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)","logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge","message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...","threadName": "Window(TumblingEventTimeWindows(5000),EventTimeTrigger,WindowCountFunction) -> (Sink: Print to Std. Out,Sink: Unnamed,Sink: Print to Std. Out) (2/2)","applicationARN": "arn:aws:kinesisanalytics:...","applicationVersionId": "39","messageSchemaVersion": "1","messageType": "INFO"
}

回复

{
    "locationinformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)","logger": "org.elasticsearch.client.RestClient","message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]","messageType": "DEBUG"
}
  • 我们还可以通过查看 Flink 仪表板来验证消息是否已从 Kafka 主题中读取并发送以进行处理

    Data is sent and received between tasks

我们没有运气的尝试

  • 我们实现了一个 RichParallelSourceFunction,它发出 1_000_000 条消息然后退出
    • 这在本地环境中运行良好
    • 作业在 AWS 环境中完成,但 sink 端没有数据
  • 我们实现了另一个 RichParallelSourceFunction,它每秒发出 100 条消息
    • 基本上我们有两个循环,一个 while(true) 外部循环和 for 内部循环
    • 在内部循环之后,我们调用Thread.sleep(1000)
    • 这在本地环境中运行良好
    • 但在 AWS 中,我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消​​息
  • 我们尝试使用不同的并行设置运行 KDA 应用程序
    • 但是没有区别
  • 我们还尝试使用不同的水印策略(forBoundedOutOfOrdernesswithIdleNowatermarks
    • 但是没有区别
  • 我们为 ProcessWindowFunctionElasticsearchSinkFunction 添加了日志
    • 每当我们从 IDEA 运行应用程序时,这些日志都会在控制台上
    • 每当我们使用 KDA 运行应用程序时,CloudWatch 中就没有此类日志
      • 添加main 的那些日志确实出现在 CloudWatch 日志中

我们假设在sink端看不到数据是因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。

欢迎任何帮助!


更新 #1

  • 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
    • 没有变化
  • 我们尝试过处理时间窗口而不是事件时间
    • 它甚至不适用于本地环境

更新 #2

平均消息大小约为 4kb。以下是示例消息的摘录:

{
  "affiliateCode": "...","appVersion": "1.1.14229","clientId": "guid","clientIpAddr": "...","clientOriginated": true,"connectionType": "Cable/DSL","countryCode": "US","design": "...","device": "...",...
  "deviceSerialNumber": "...","dma": "UNKNowN","eventSource": "...","firstRunTimestamp": 1609091112818,"friendlyDeviceName": "Comcast","fullDevice": "Comcast ...","geoInfo": {
    "continent": {
      "code": "NA","geoname_id": 120
    },"country": {
      "geoname_id": 123,"iso_code": "US"
    },"location": {
      "accuracy_radius": 100,"latitude": 37.751,"longitude": -97.822,"time_zone": "America/Chicago"
    },"registered_country": {
      "geoname_id": 123,"iso_code": "US"
    }
  },"height": 720,"httpUserAgent": "Mozilla/...","isLoggedIn": true,"launchCount": 19,"model": "...","os": "Comcast...","osversion": "...",...
  "platformTenantCode": "...","productCode": "...","requestOrigin": "https://....com","serverTimeUtc": 1617809474787,"serviceCode": "...","serviceOriginated": false,"sessionId": "guid","sessionSequence": 2,"subtype": "...","tEventId": "...",...
  "tRegion": "us-east-1","timeZoneOffset": 5,"timestamp": 1617809473305,"traits": {
    "isp": "Comcast Cable","organization": "..."
  },"type": "...","userId": "guid","version": "v1","width": 1280,"xb3traceId": "guid"
}

我们使用 ObjectMapper 仅解析 json 的部分字段。 Telemetry 类如下所示:

public class Telemetry {
    public String AppVersion;
    public String CountryCode;
    public String ClientId;
    public String DeviceSerialNumber;
    public String EventSource;
    public String SessionId;
    public TelemetrySubTypes SubType; //enum
    public String TRegion;
    public Long TimeStamp;
    public TelemetryTypes Type; //enum
    public String StateIso;
    
    ...
}

更新 #3

来源

子任务标签

ID 收到的字节 收到的记录 发送的字节数 发送的记录 状态
0 0 B 0 0 B 0 正在运行
1 0 B 0 2.83 MB 15,000 正在运行

水印标签

无数据

窗口

子任务标签

ID 收到的字节 收到的记录 发送的字节数 发送的记录 状态
0 1.80 MB 9,501 0 B 0 正在运行
1 1.04 MB 5,499 0 B 0 正在运行

水印

子任务 水印
1 无水印
2 无水印

解决方法

根据您提供的评论和更多信息,问题似乎是两个 Flink 使用者无法从同一分区消费。因此,在您的情况下,只有一个操作符的并行实例会从 kafka 分区中使用,而另一个将处于空闲状态。

通常 Flink 操作员会选择 MIN([all_downstream_parallel_watermarks]),所以在你的情况下,一个 Kafka Consumer 会产生正常的水印,另一个不会产生任何东西(在这种情况下,flink 假设 Long.Min),所以 Flink 会选择较低的一个是 Long.Min。因此,永远不会触发窗口,因为在数据流动时,永远不会生成一个水印。好的做法是在使用 Kafka 时使用与 Kafka 分区数相同的并行度。

,

在与 AWS 人员进行支持会话后,我们发现我们未能在流媒体环境中设置时间特性。

  • 在 1.11.1 中,TimeCharacteristic 的默认值为 IngestionTime
  • 自 1.12.1(参见相关 release notes)起,默认值为 EventTime

在 Flink 1.12 中,默认的流时间特性已更改为 EventTime,因此您不再需要调用此方法来启用事件时间支持。

因此,在我们明确设置 EventTime 之后,它开始生成像魅力一样的水印:

streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);