class HashPartitioner(partitions: Int) extends Partitioner { def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) }
override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false }
override def hashCode: Int = numPartitions }
注:此方式可能导致数据偏移。
RangePartitioner
分区方式:
根据父 RDD 的数据特征,确定子 RDD 分区的边界
给定一个键值对数据,能够快速根据键值定位其所应该被分配的分区编号
注:通过水塘抽样算法确定边界数组,再根据 key 来获取所在的分区索引。具体实现细节参见源码。
自定义分区器
1 2 3 4 5 6 7 8 9 10 11 12
class TestPartitioner(Partitions:Int) extends Partitioner { override def numPartitions: Int = Partitions override def getPartition(key: Any):Int = { val a = if (<xxx>) { 1 }else if (<xxx>){ 2 }else{ 0 } a }