Spark Partitioner

Spark Partitioner

简介

Spark 中的数据可以被分区器(Partitioner)重新分配分区,解决数据倾斜等问题。

分区函数

Spark 内置了 HashPartitioner 和 RangePartitioner 两种分区器,并且用户可以编写一个 Partitioner 的子类完成自定义分区器。

HashPartitioner

HashPartitioner 为默认分区器

分区方式:

哈希取模,具体源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}

注:此方式可能导致数据偏移。

RangePartitioner

分区方式:

  1. 根据父 RDD 的数据特征,确定子 RDD 分区的边界
  2. 给定一个键值对数据,能够快速根据键值定位其所应该被分配的分区编号

注:通过水塘抽样算法确定边界数组,再根据 key 来获取所在的分区索引。具体实现细节参见源码。

自定义分区器

1
2
3
4
5
6
7
8
9
10
11
12
class TestPartitioner(Partitions:Int) extends Partitioner {
override def numPartitions: Int = Partitions
override def getPartition(key: Any):Int = {
val a = if (<xxx>) {
1
}else if (<xxx>){
2
}else{
0
}
a
}

参考资料

官方文档

Apache Spark 源码阅读

源码


Spark Partitioner
https://wangqian0306.github.io/2022/partitioner/
作者
WangQian
发布于
2022年7月19日
许可协议