如何更改 Flink DataSet 的默认分区数?

问题描述

这里有一个需求:数据集太大,我们需要对数据进行分区,在每个分区计算一个本地结果,然后合并。例如,如果有 100 万条数据分成 100 个分区,那么每个副本将只有大约 10000 条数据。由于需要使用分区数进行调优,分区数需要可变。另外,一个分区的所有数据必须批量计算,不能一一计算。

实现如下: 在分区阶段之后,每条数据都会有一个键来表示它所属的分区。现在,数据应该是这样的:afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]。 接下来,使用 Flink 的 partitionCustommapPartition 运算符。

  dataSet = env. fromCollection(afterPartitionedData)
  dataset
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int,numPartitions: Int) = {
      println("numPartitions="+numPartitions) // 6,cpu number
      key // just return the partitionID
    }
  }

然而,却报错:

...
Caused by: java.lang.Arrayindexoutofboundsexception: 6
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...

这个好像是因为Flink的认分区数DataSetcpu数量,在我的电脑上是6,所以会报 java.lang.Arrayindexoutofboundsexception : 6

那么我的问题是:有没有办法随意改变分区数?我在 API Partitioner方法 Partition (key: int,numpartitions: int) 中找到了这个参数,但不知道如何更改它。

有没有办法改变DataSet分区的数量

Flink 版本为 1.6,测试代码为:

object SimpleFlinkFromBlog {

  def main(args: Array[String]): Unit = {
    val  env  =  ExecutionEnvironment.getExecutionEnvironment
    val afterPartitionedData = new mutable.MutableList[(Int,String)]
    afterPartitionedData.+=((0,"0"))

    afterPartitionedData.+=((1,"1"))

    afterPartitionedData.+=((2,"2"))
    afterPartitionedData.+=((2,"2"))

    afterPartitionedData.+=((3,"3"))
    afterPartitionedData.+=((3,"3"))

    afterPartitionedData.+=((4,"4"))

    afterPartitionedData.+=((5,"5"))
    afterPartitionedData.+=((5,"5"))

    // Comment this line will not report an error.
    // java.lang.Arrayindexoutofboundsexception : 6
    afterPartitionedData.+=((6,"will wrong"))

    val dataSet = env.fromCollection( afterPartitionedData )
    val localRes = dataSet
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new MapPartitionFunction[(Int,Int] {
        override def mapPartition(values: lang.Iterable[(Int,String)],out: Collector[Int]) = {
          var count = 0;
          values.forEach(new Consumer[(Int,String)] {
            override def accept(t: (Int,String)): Unit = {
              count=count+1;
              print("current count is " + count + "   tuple is " + t + "\n");
            }
          })
          out.collect(count)
        }
      })

    localRes.collect().foreach(println)
  }

  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int,numPartitions: Int) = {
//      println("numPartitions="+numPartitions)
      key
    }
  }
}

谢谢!

解决方法

分区数是并行度,可以在提交作业时在命令行中设置,也可以在flink-conf.yaml中设置。