| 2015年11月30日
在Spark中,存在两类分区函数:HashPartitioner和RangePartitioner,它们都是继承自Partitioner,主要提供了每个RDD有几个分区(numPartitions)以及对于给定的值返回一个分区ID(0~numPartitions-1),也就是决定这个值是属于那个分区的。
1 HashPartitioner分区
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。实现如下:
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions // 分片数初始化
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
//对key的hashCode进行按照numPartitions取模,这里返回的是一个正整数。
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
下面是string的hashCode的实现,这里可以看出这hash其实就是很简单的一个字符串按int累加。最后返回的也是一个整型值。
public int hashCode() {
int h = hash;
if (h == 0 && value.length > 0) {
char val[] = value;
for (int i = 0; i < value.length; i++) {
h = 31 * h + val[i];
}
hash = h;
}
return h;
}
从这里看出partitioner的实现是非常简单的,但是实际工作中这个partitioner规则有可能要我们按照自己的数据规则重新定义,就需要扩展partitioner了。下面是我们扩展cityhash算法的partitioner。
2 自定义Partitioner扩展
import org.apache.spark.Partitioner;
public class CityHashPartitioner extends Partitioner {
static {
System.loadLibrary("cityhash"); //加载cityhash的so文件
}
private int numParts;
public CityHashPartitioner(int numParts) {
this.numParts = numParts;
}
public int getPartition(Object key) {
return (int) cityhashJNI.CityHash64IdataMod(key.toString(), key.toString().length(), numParts);
//调用cityhash的取模函数,进行取模计算。
}
public int numPartitions() {
return numParts;
}
}
还是比较简单,使用时直接按照下面的方式使用即可。
CityHashPartitioner partitioner = new CityHashPartitioner(partnum);```
world_rdd.repartitionAndSortWithinPartitions(partitioner)
.saveAsHadoopFile(world_out_path, String.class,
String[].class, TextFileOutFormat.class);
TextFileOutFormat又是一个自定义的文件输出类。
关注「黑光技术」,关注大数据+微服务