来自整数流的双重数组

问题描述

如何将 Flux 转换为 Flux

我有一个 Flux -> {1,2,3,.....100} 我想将它们按 30 个数字分组 -> [[1,.....30 ],[31,32....60],[61.....100]]

我尝试了以下方法但没有成功。元素以 5 [ [1,4,5],[6,7,8,9,10],.....] 为一组进行分组

Flux<int[][]> groupedData = fluxData.map(x -> {
            int outerarraySize = (int) Math.ceil(x.size() / 30) +1;
            System.out.println(outerarraySize);

            int[][] BoxedData = new int[30][outerarraySize];
            AtomicInteger innerArray = new AtomicInteger(0);
            AtomicInteger outerarray = new AtomicInteger(0);

            x.forEach(ids -> {

                BoxedData[innerArray.get()][outerarray.get()] = ids;
                innerArray.getAndIncrement();
                if (innerArray.get() == 30) {
                    innerArray.set(0);
                    outerarray.getAndIncrement();
                }


            });

解决方法

Flux 有一个很有用的操作符叫做 'buffer',我们可以用它来批量处理整数列表。

我已经像这样创建了我的“fluxData”,只是为了测试我的代码:

    List<Integer> list1 = IntStream.rangeClosed(1,100).boxed().collect(Collectors.toList());
    List<Integer> list2 = IntStream.rangeClosed(1,40).boxed().collect(Collectors.toList());
    List<Integer> list3 = IntStream.rangeClosed(1,70).boxed().collect(Collectors.toList());

    Flux<List<Integer>> fluxData = Flux.just(list1,list2,list3);

现在,我们可以执行以下操作:

    fluxData.map(integersList -> {
                List<List<Integer>> batchesList = Flux.fromStream(integersList.stream())
                        .buffer(30) // This the magic.
                        .collectList()
                        .block();

                // List<List<Integer>> --> int[][]
                int[][] batchesArray = new int[batchesList.size()][];
                for(int i = 0;i < batchesArray.length;i++){
                    batchesArray[i] = new int[batchesList.get(i).size()];
                    for (int j = 0; j < batchesArray[i].length; j++) {
                        batchesArray[i][j] = batchesList.get(i).get(j);
                    }
                }

                return batchesArray;
            })
            .subscribe(batchesArray -> System.out.println(Arrays.deepToString(batchesArray)));

输出:

[[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30],[31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60],[61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90],[91,92,93,94,95,96,97,98,99,100]]
[[1,40]]
[[1,70]]