缓慢启动Spark Streaming Receiver?

问题描述

我必须在我的Spark Streaming应用程序中启动400个接收器,但是启动接收器花费的时间很长,这导致一个问题,许多处理数据排队,并且在应用程序时间到期后无法处理

processing data in queue

代码:

            Map<DataFile,List<String>> blockDataFileMap = new HashMap<>();
            if (dataFileMap != null) {
                // make dataFile divided into several(num of vu) blocks
                for (Entry<DataFile,List<String>> entry : dataFileMap.entrySet()) {
                    DataFile dataFile = entry.getKey();
                    List<String> lines = entry.getValue();
                    if (lines.size() < vusers) {
                        LOG.warn(dataFile + " is too small to split. Whole file will be used for each vuser.");
                        blockDataFileMap.put(dataFile,lines);
                    } else {
                        // make every block only get the elements which index is in i st col of indexList
                        List<String> selectedLines = new ArrayList<>();
                        int block = lines.size() / vusers;
                        for (int j = 0,k = indexList.get(0).get(i); j < block; j++,k = j * vusers
                                + indexList.get(j).get(i)) {
                            selectedLines.add(lines.get(k));
                        }
                        // add the element in the end of DataFile
                        if (block * vusers + i < lines.size()) {
                            selectedLines.add(lines.get(block * vusers + i));
                        }
                        blockDataFileMap.put(dataFile,selectedLines);
                    }
                }
            }
            StreamingReceiver receiver = new StreamingReceiver(StorageLevel.MEMORY_AND_DISK(),batchTime,blockDataFileMap,oneOffdataFiles,reportId,serverUrl,new SerializableConfiguration(conf),serviceWrapper);
            list.add(jssc.receiverStream(receiver));
        }

        @SuppressWarnings("unchecked")
        JavaDStream<Result>[] javaDStreams = (JavaDStream<Result>[]) new JavaDStream<?>[list.size()];
        int len = list.size();
        for (int i = 0; i < len; i++) {
            javaDStreams[i] = list.get(i);
        }
        JavaDStream<Result> lines = jssc.union(javaDStreams);

        // List<(chainId,Result)>
        JavaPairDStream<String,Result> pointRdd = lines.mapToPair(new PairFunction<Result,String,Result>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String,Result> call(Result result) throws Exception {
                return new Tuple2<>(result.getChain(),result);
            }
        });

        // shuffle when groupByKey
        pointRdd.groupByKey().map(
                new Function<Tuple2<String,Iterable<Result>>,Tuple2<String,Iterator<Tuple2<Long,Iterator<Result>>>>>() {

                    private static final long serialVersionUID = 5928985493323358322L;

                    // Arg: (chainId,List<Result>)
                    // Return: (chainId,List<(timeInSecond,List<Result>)>)
                    @Override
                    public Tuple2<String,Iterator<Result>>>> call(
                            Tuple2<String,Iterable<Result>> v1) throws Exception {
                        String chain = v1._1;
                        Iterator<Result> results = v1._2.iterator();
                        Map<Long,List<Result>> map = new HashMap<>();
                        List<Tuple2<Long,Iterator<Result>>> list = new ArrayList<>();
                        while (results.hasNext()) {
                            Result tmp = results.next();
                            Long time = tmp.getTime();
                            if (!map.containsKey(time)) {
                                map.put(time,new ArrayList<Result>());
                            }
                            map.get(time).add(tmp);
                        }
                        for (Entry<Long,List<Result>> tmp : map.entrySet()) {
                            list.add(new Tuple2<>(tmp.getKey(),tmp.getValue().iterator()));
                        }
                        return new Tuple2<>(chain,list.iterator());
                    }
                }).mapPartitions(new CalculateFunction(broadcast))
                .foreachRDD(new VoidFunction<JavaRDD<Tuple2<String,Map<String,Map<Long,Integer>>>>>() {
                    @Override
                    public void call(JavaRDD<Tuple2<String,Integer>>>> tuple2JavaRDD)
                            throws Exception  {
                        List<Tuple2<String,Integer>>>> resList = tuple2JavaRDD.collect();
                        if (resList.size() > 0) {

                            for (Tuple2<String,Integer>>> resTuple : resList) {
                                String chain = resTuple._1();
                                Map<String,Integer>> histogramResultWithTag = resTuple._2();
                                //merge histogram data from MapFunction into the HashMap of the final result
                                if (histogramFinalResultMap.containsKey(chain)) {
                                    Map<String,Integer>> histogramFinalResultWithTag =
                                            histogramFinalResultMap.get(chain);
                                    for (Entry<String,Integer>> tmpEntryWithTag
                                            : histogramResultWithTag.entrySet()) {
                                        String tag = tmpEntryWithTag.getKey();
                                        Map<Long,Integer> histogramResult = tmpEntryWithTag.getValue();
                                        if (histogramFinalResultWithTag.containsKey(tag)) {
                                            Map<Long,Integer> histogramFinalResult =
                                                    histogramFinalResultWithTag.get(tag);
                                            for (Entry<Long,Integer> tmpEntry : histogramResult.entrySet()) {
                                                Long rt = tmpEntry.getKey();
                                                Integer count = tmpEntry.getValue();
                                                if (histogramFinalResult.containsKey(rt)) {
                                                    histogramFinalResult.put(rt,count + histogramFinalResult.get(rt));
                                                } else {
                                                    histogramFinalResult.put(rt,count);
                                                }
                                            }
                                        } else {
                                            histogramFinalResultWithTag.put(tag,histogramResult);
                                        }
                                    }
                                } else {
                                    histogramFinalResultMap.put(chain,histogramResultWithTag);
                                }
                            }
                        }
                    }
                });
        jssc.start();
        jssc.awaitTerminationOrTimeout(timeLimit * 1000);
        jssc.close();

在我的应用程序中,我将初始400个接收器。

我不知道为什么接收器启动这么慢(请给我一些可能的原因)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...