问题描述
如何将 Flux 转换为 Flux
我有一个 Flux -> {1,2,3,.....100} 我想将它们按 30 个数字分组 -> [[1,.....30 ],[31,32....60],[61.....100]]
我尝试了以下方法但没有成功。元素以 5 [ [1,4,5],[6,7,8,9,10],.....] 为一组进行分组
Flux<int[][]> groupedData = fluxData.map(x -> {
int outerarraySize = (int) Math.ceil(x.size() / 30) +1;
System.out.println(outerarraySize);
int[][] BoxedData = new int[30][outerarraySize];
AtomicInteger innerArray = new AtomicInteger(0);
AtomicInteger outerarray = new AtomicInteger(0);
x.forEach(ids -> {
BoxedData[innerArray.get()][outerarray.get()] = ids;
innerArray.getAndIncrement();
if (innerArray.get() == 30) {
innerArray.set(0);
outerarray.getAndIncrement();
}
});
解决方法
Flux 有一个很有用的操作符叫做 'buffer',我们可以用它来批量处理整数列表。
我已经像这样创建了我的“fluxData”,只是为了测试我的代码:
List<Integer> list1 = IntStream.rangeClosed(1,100).boxed().collect(Collectors.toList());
List<Integer> list2 = IntStream.rangeClosed(1,40).boxed().collect(Collectors.toList());
List<Integer> list3 = IntStream.rangeClosed(1,70).boxed().collect(Collectors.toList());
Flux<List<Integer>> fluxData = Flux.just(list1,list2,list3);
现在,我们可以执行以下操作:
fluxData.map(integersList -> {
List<List<Integer>> batchesList = Flux.fromStream(integersList.stream())
.buffer(30) // This the magic.
.collectList()
.block();
// List<List<Integer>> --> int[][]
int[][] batchesArray = new int[batchesList.size()][];
for(int i = 0;i < batchesArray.length;i++){
batchesArray[i] = new int[batchesList.get(i).size()];
for (int j = 0; j < batchesArray[i].length; j++) {
batchesArray[i][j] = batchesList.get(i).get(j);
}
}
return batchesArray;
})
.subscribe(batchesArray -> System.out.println(Arrays.deepToString(batchesArray)));
输出:
[[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30],[31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60],[61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90],[91,92,93,94,95,96,97,98,99,100]]
[[1,40]]
[[1,70]]