用于批量 mongodb 更新的 java Apache Flink 批处理性能

问题描述

专家,

我正在尝试对大型数据集执行一些 ETL 操作作为批处理。我的要求是提取数据,转换它,然后保存到 mongoDB。我正在使用 Apache FLINK,但性能很慢,因为我正在对每一行进行 mongoDB 更新。

有什么方法可以将批量记录下沉,以便提高性能。就像在所有转换之后我们在 mongoDB 上进行批量更新一样。 我们可以将它们全部聚合并最终将其沉入数据库中,就像流 [.aggregate() .sink({批量更新})]


private DataEnrichmentDO submitToFlinkJobManager(ExecutionEnvironment env,List<Tuple2<String,Integer>> inputCollection,long collectionSize) throws Exception  {
        
        try {
            DataSet<Tuple2<String,Integer>> inputCollectionData = env.fromCollection(inputCollection);
            DataSet<String> enrichmentContext = env.fromElements(this.clientContext.toString(),this.collectionContext.toString(),this.enrichColumnDeFinitions.toString(),this.lookupDeFinitions.toString(),this.quantityUnitConversions.toString(),this.technicalDataTypes.toString(),this.errorContext.toString(),this.errorCodeCache.toString(),this.subCurrencyConversions.toString());
            List<DataEnrichmentDO> result = inputCollectionData
                                            .rebalance()
                                            .map(new DataEnrichmentExpressionEvaluator())
                                            .withbroadcastSet(enrichmentContext,"enrichmentContext")
                                            .collect();

我们可以在转换后收集整个集合,然后进行批量mongoDB更新吗?目前在地图功能上我正在做更新操作。我已将并行度设置为 8 { setParallelism(8);}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)