问题描述
我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。
使用过的技术
本地
- 来源:Kafka 2.7
- 1 个经纪人
- 1 个主题,分区为 1,复制因子为 1
- 处理:Flink 1.12.1
- 接收器:Managed ElasticSearch Service 7.9.1(与 AWS 相同的实例)
AWS
- 来源:亚马逊 MSK Kafka 2.8
- 处理:Amazon KDA Flink 1.11.1
- 并行度:2
- 每个 KPU 的并行度:2
- 接收器:托管 ElasticSearch 服务 7.9.1
应用逻辑
-
FlinkKafkaConsumer
从主题中读取 json 格式的消息 - json 映射到域对象,称为
Telemetry
private static DataStream<Telemetry> SetupKafkaSource(StreamExecutionEnvironment environment){
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"broKER1_ADDRESS.amazonaws.com:9092");
kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONfig,"flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("THE_TOPIC",new SimpleStringSchema(),kafkaProperties);
consumer.setStartFromEarliest(); //Just for repeatable testing
return environment
.addSource(consumer)
.map(new MapJsonToTelemetry());
}
- 为 EventTimeStamp 选择遥测的时间戳。
3.1.随着forMonotonousTimeStamps
- 遥测的
StateIso
用于keyBy
。
4.1.美国州的两个字母的iso代码 - 应用 5 秒翻滚窗口策略
private static SingleOutputStreamOperator<StateAggregatedTelemetry> SetupProcessing(DataStream<Telemetry> telemetries) {
WatermarkStrategy<Telemetry> wmStrategy =
WatermarkStrategy
.<Telemetry>forMonotonousTimestamps()
.withTimestampAssigner((event,timestamp) -> event.TimeStamp);
return telemetries
.assignTimestampsAndWatermarks(wmStrategy)
.keyBy(t -> t.StateIso)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new WindowCountFunction());
}
- 调用自定义
ProcessWindowFunction
来执行一些基本聚合。
6.1.我们计算单个StateAggregatedTelemetry
- ElasticSearch 被配置为接收器。
7.1.StateAggregatedTelemetry
数据映射到HashMap
并推送到source
。
7.2.所有setBulkFlushXYZ
方法都设置为低值
private static void SetupElasticSearchSink(SingleOutputStreamOperator<StateAggregatedTelemetry> telemetries) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(HttpHost.create("https://ELKCLUSTER_ADDRESS.amazonaws.com:443"));
ElasticsearchSink.Builder<StateAggregatedTelemetry> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,(ElasticsearchSinkFunction<StateAggregatedTelemetry>) (element,ctx,indexer) -> {
Map<String,Object> record = new HashMap<>();
record.put("stateIso",element.StateIso);
record.put("healthy",element.Flawless);
record.put("unhealthy",element.Faulty);
...
LOG.info("Telemetry has been added to the buffer");
indexer.add(Requests.indexRequest()
.index("INDEXPREFIX-"+ from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.source(record,XContentType.JSON));
}
);
//Using low values to make sure that the Flush will happen
esSinkBuilder.setBulkFlushMaxActions(25);
esSinkBuilder.setBulkFlushInterval(1000);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushBackoff(true);
esSinkBuilder.setRestClientFactory(restClientBuilder -> {});
LOG.info("Sink has been attached to the DataStream");
telemetries.addSink(esSinkBuilder.build());
}
排除的东西
- 我们设法将 Kafka、KDA 和 ElasticSearch 放在相同的 VPC 和相同的子网下,以避免需要对每个请求进行签名
- 从日志中我们可以看到Flink可以到达ES集群。
请求
{
"locationinformation": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.verifyClientConnection(Elasticsearch7ApiCallBridge.java:135)","logger": "org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge","message": "Pinging Elasticsearch cluster via hosts [https://...es.amazonaws.com:443] ...","threadName": "Window(TumblingEventTimeWindows(5000),EventTimeTrigger,WindowCountFunction) -> (Sink: Print to Std. Out,Sink: Unnamed,Sink: Print to Std. Out) (2/2)","applicationARN": "arn:aws:kinesisanalytics:...","applicationVersionId": "39","messageSchemaVersion": "1","messageType": "INFO"
}
{
"locationinformation": "org.elasticsearch.client.RequestLogger.logResponse(RequestLogger.java:59)","logger": "org.elasticsearch.client.RestClient","message": "request [HEAD https://...es.amazonaws.com:443/] returned [HTTP/1.1 200 OK]","messageType": "DEBUG"
}
- 我们还可以通过查看 Flink 仪表板来验证消息是否已从 Kafka 主题中读取并发送以进行处理
我们没有运气的尝试
- 我们实现了一个
RichParallelSourceFunction
,它发出 1_000_000 条消息然后退出- 这在本地环境中运行良好
- 作业在 AWS 环境中完成,但 sink 端没有数据
- 我们实现了另一个
RichParallelSourceFunction
,它每秒发出 100 条消息 - 我们尝试使用不同的并行设置运行 KDA 应用程序
- 但是没有区别
- 我们还尝试使用不同的水印策略(
forBoundedOutOfOrderness
、withIdle
、Nowatermarks
)- 但是没有区别
- 我们为
ProcessWindowFunction
和ElasticsearchSinkFunction
添加了日志- 每当我们从 IDEA 运行应用程序时,这些日志都会在控制台上
- 每当我们使用 KDA 运行应用程序时,CloudWatch 中就没有此类日志
- 添加到
main
的那些日志确实出现在 CloudWatch 日志中
- 添加到
我们假设在sink端看不到数据是因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。
欢迎任何帮助!
更新 #1
- 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
- 没有变化
- 我们尝试过处理时间窗口而不是事件时间
- 它甚至不适用于本地环境
更新 #2
平均消息大小约为 4kb。以下是示例消息的摘录:
{
"affiliateCode": "...","appVersion": "1.1.14229","clientId": "guid","clientIpAddr": "...","clientOriginated": true,"connectionType": "Cable/DSL","countryCode": "US","design": "...","device": "...",...
"deviceSerialNumber": "...","dma": "UNKNowN","eventSource": "...","firstRunTimestamp": 1609091112818,"friendlyDeviceName": "Comcast","fullDevice": "Comcast ...","geoInfo": {
"continent": {
"code": "NA","geoname_id": 120
},"country": {
"geoname_id": 123,"iso_code": "US"
},"location": {
"accuracy_radius": 100,"latitude": 37.751,"longitude": -97.822,"time_zone": "America/Chicago"
},"registered_country": {
"geoname_id": 123,"iso_code": "US"
}
},"height": 720,"httpUserAgent": "Mozilla/...","isLoggedIn": true,"launchCount": 19,"model": "...","os": "Comcast...","osversion": "...",...
"platformTenantCode": "...","productCode": "...","requestOrigin": "https://....com","serverTimeUtc": 1617809474787,"serviceCode": "...","serviceOriginated": false,"sessionId": "guid","sessionSequence": 2,"subtype": "...","tEventId": "...",...
"tRegion": "us-east-1","timeZoneOffset": 5,"timestamp": 1617809473305,"traits": {
"isp": "Comcast Cable","organization": "..."
},"type": "...","userId": "guid","version": "v1","width": 1280,"xb3traceId": "guid"
}
我们使用 ObjectMapper
仅解析 json 的部分字段。 Telemetry
类如下所示:
public class Telemetry {
public String AppVersion;
public String CountryCode;
public String ClientId;
public String DeviceSerialNumber;
public String EventSource;
public String SessionId;
public TelemetrySubTypes SubType; //enum
public String TRegion;
public Long TimeStamp;
public TelemetryTypes Type; //enum
public String StateIso;
...
}
更新 #3
来源
子任务标签
ID | 收到的字节 | 收到的记录 | 发送的字节数 | 发送的记录 | 状态 |
---|---|---|---|---|---|
0 | 0 B | 0 | 0 B | 0 | 正在运行 |
1 | 0 B | 0 | 2.83 MB | 15,000 | 正在运行 |
水印标签
无数据
窗口
子任务标签
ID | 收到的字节 | 收到的记录 | 发送的字节数 | 发送的记录 | 状态 |
---|---|---|---|---|---|
0 | 1.80 MB | 9,501 | 0 B | 0 | 正在运行 |
1 | 1.04 MB | 5,499 | 0 B | 0 | 正在运行 |
水印
子任务 | 水印 |
---|---|
1 | 无水印 |
2 | 无水印 |
解决方法
根据您提供的评论和更多信息,问题似乎是两个 Flink 使用者无法从同一分区消费。因此,在您的情况下,只有一个操作符的并行实例会从 kafka 分区中使用,而另一个将处于空闲状态。
通常 Flink 操作员会选择 MIN([all_downstream_parallel_watermarks])
,所以在你的情况下,一个 Kafka Consumer 会产生正常的水印,另一个不会产生任何东西(在这种情况下,flink 假设 Long.Min
),所以 Flink 会选择较低的一个是 Long.Min
。因此,永远不会触发窗口,因为在数据流动时,永远不会生成一个水印。好的做法是在使用 Kafka 时使用与 Kafka 分区数相同的并行度。
在与 AWS 人员进行支持会话后,我们发现我们未能在流媒体环境中设置时间特性。
- 在 1.11.1 中,
TimeCharacteristic
的默认值为IngestionTime
。 - 自 1.12.1(参见相关 release notes)起,默认值为
EventTime
:
在 Flink 1.12 中,默认的流时间特性已更改为 EventTime,因此您不再需要调用此方法来启用事件时间支持。
因此,在我们明确设置 EventTime
之后,它开始生成像魅力一样的水印:
streamingEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);