问题描述
这里有一个需求:数据集太大,我们需要对数据进行分区,在每个分区计算一个本地结果,然后合并。例如,如果有 100 万条数据分成 100 个分区,那么每个副本将只有大约 10000 条数据。由于需要使用分区数进行调优,分区数需要可变。另外,一个分区的所有数据必须批量计算,不能一一计算。
实现如下:
在分区阶段之后,每条数据都会有一个键来表示它所属的分区。现在,数据应该是这样的:afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]
。
接下来,使用 Flink 的 partitionCustom
和 mapPartition
运算符。
dataSet = env. fromCollection(afterPartitionedData)
dataset
.partitionCustom(new myPartitioner(),0)
.mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int,numPartitions: Int) = {
println("numPartitions="+numPartitions) // 6,cpu number
key // just return the partitionID
}
}
然而,却报错:
...
Caused by: java.lang.Arrayindexoutofboundsexception: 6
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...
这个好像是因为Flink的默认分区数DataSet
是cpu的数量,在我的电脑上是6,所以会报 java.lang.Arrayindexoutofboundsexception : 6
。
那么我的问题是:有没有办法随意改变分区数?我在 API Partitioner 的方法 Partition (key: int,numpartitions: int)
中找到了这个参数,但不知道如何更改它。
有没有办法改变DataSet
分区的数量?
Flink 版本为 1.6,测试代码为:
object SimpleFlinkFromBlog {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val afterPartitionedData = new mutable.MutableList[(Int,String)]
afterPartitionedData.+=((0,"0"))
afterPartitionedData.+=((1,"1"))
afterPartitionedData.+=((2,"2"))
afterPartitionedData.+=((2,"2"))
afterPartitionedData.+=((3,"3"))
afterPartitionedData.+=((3,"3"))
afterPartitionedData.+=((4,"4"))
afterPartitionedData.+=((5,"5"))
afterPartitionedData.+=((5,"5"))
// Comment this line will not report an error.
// java.lang.Arrayindexoutofboundsexception : 6
afterPartitionedData.+=((6,"will wrong"))
val dataSet = env.fromCollection( afterPartitionedData )
val localRes = dataSet
.partitionCustom(new myPartitioner(),0)
.mapPartition(new MapPartitionFunction[(Int,Int] {
override def mapPartition(values: lang.Iterable[(Int,String)],out: Collector[Int]) = {
var count = 0;
values.forEach(new Consumer[(Int,String)] {
override def accept(t: (Int,String)): Unit = {
count=count+1;
print("current count is " + count + " tuple is " + t + "\n");
}
})
out.collect(count)
}
})
localRes.collect().foreach(println)
}
class myPartitioner extends Partitioner[Int]{
override def partition(key: Int,numPartitions: Int) = {
// println("numPartitions="+numPartitions)
key
}
}
}
谢谢!
解决方法
分区数是并行度,可以在提交作业时在命令行中设置,也可以在flink-conf.yaml中设置。