问题描述
我正在 Flink 上使用 Kinesis Data Analytics 进行流处理。
我正在处理的用例是从单个 Kinesis 流中读取记录,并在一些转换后写入多个 S3 存储桶。一个源记录可能会出现在多个 S3 存储桶中。我们需要写入多个存储桶,因为源记录包含大量需要拆分到多个 S3 存储桶的信息。
我尝试使用多个接收器来实现这一点。
private static <T> SinkFunction<T> createS3SinkFromStaticConfig(String path,Class<T> type) {
OutputFileConfig config = OutputFileConfig
.builder()
.withPartSuffix(".snappy.parquet")
.build();
final StreamingFileSink<T> sink = StreamingFileSink
.forBulkFormat(new Path(s3SinkPath + "/" + path),createParquetWriter(type))
.withBucketAssigner(new S3BucketAssigner<T>())
.withOutputFileConfig(config)
.withRollingPolicy(new RollingPolicy<T>(DEFAULT_MAX_PART_SIZE,DEFAULT_ROLLOVER_INTERVAL))
.build();
return sink;
}
public static void main(String[] args) throws Exception {
DataStream<PIData> input = createSourceFromStaticConfig(env)
.map(new JsonToSourceDataMap())
.name("jsonToInputDataTransformation");
input.map(value -> value)
.name("rawData")
.addSink(createS3SinkFromStaticConfig("raw_data",InputData.class))
.name("s3Sink");
input.map(FirstConverter::convertInputData)
.addSink(createS3SinkFromStaticConfig("firstOutput",Output1.class));
input.map(SecondConverter::convertInputData)
.addSink(createS3SinkFromStaticConfig("secondOutput",Output2.class));
input.map(ThirdConverter::convertInputData)
.addSink(createS3SinkFromStaticConfig("thirdOutput",Output3.class));
//and so on; There are around 10 buckets.
}
但是,我看到了由此产生的巨大性能影响。由于这个原因,我看到了一个很大的 CPU 峰值(与只有一个接收器的情况相比)。我看到的规模大约是每秒 10 万条记录。
其他注意事项: 我正在使用批量格式编写器,因为我想以镶木地板格式编写文件。我尝试将检查点间隔从 1 分钟增加到 3 分钟,假设每分钟将文件写入 s3 可能会导致问题。但这并没有多大帮助。
由于我是 flink 和流处理的新手,我不确定是否会出现如此大的性能影响,或者我是否可以做得更好? 使用 flatmap 运算符然后使用单个接收器会更好吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)