网站首页 > 教程文章 正文
双值类型
两个数据源之间的关联操作。
intersection
函数签名
def intersection(other: RDD[T]): RDD[T]
函数说明
对源RDD和参数RDD求交集后返回一个新的RDD。
union()
函数签名
def union(other: RDD[T]): RDD[T]
函数说明
对源RDD和参数RDD求并集后返回一个新的RDD。
subtract()
函数签名
def subtract(other: RDD[T]): RDD[T]
函数说明
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集。
intersection() 、union()、subtract() 不支持两个数据类型不一致的 RDD 进行求交集操作,参数要求与原 RDD 的元素类型一致。
zip()
函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
拉链操作,两个 RDD 的数据类型可以不一致。
分区数不相等时,不能进行拉链操作。
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), numSlices = 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 4)
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd3 = rdd1.zip(rdd2)
println("Zip:" + rdd3.collect().mkString(","))
会报如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 4)
两个数据源中,分区中的数据数量也要保持一致。
val rdd4 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 2)
val rdd5 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 2)
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd4.zip(rdd5)
println("Zip:" + rdd6.collect().mkString(","))
否则报如下错误:
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
综合示例:
object RDD_Transform_MultipleValue {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 双Value 类型
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
// 交集 期望【3,4】
val rdd3 = rdd1.intersection(rdd2)
println("Intersection:" + rdd3.collect().mkString(","))
// 并集 期望【1,2,3,4,3,4,5,6】
val rdd4 = rdd1.union(rdd2)
println("Union:" + rdd4.collect().mkString(","))
// 差集 期望【1,2】
val rdd5 = rdd1.subtract(rdd2)
println("Subtract:" + rdd5.collect().mkString(","))
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd1.zip(rdd2)
println("Zip:" + rdd6.collect().mkString(","))
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
Intersection:3,4
Union:1,2,3,4,3,4,5,6
Subtract:1,2
Zip:(1,3),(2,4),(3,5),(4,6)
key-value类型
partitionBy()
函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
函数说明
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。
partitionBy() 算子并不是属于 RDD 的方法,而是属于 PairRDDFunctions 类的方法,要求 RDD 必须是 Key-Value 类型的 RDD。Scala 隐式转换,程序在编译错误时,会尝试在作用域范围内查找转换规则,将类型转换成特定类型后编译通过。隐式转换,相当于一种二次编译。
RDD 中有 rddToPairRDDFunctions() 隐式函数,尝试将 [K,V] 类型的 RDD 转换成 PairRDDFunctions 类型。
partitionBy() 要与 coalesce()、repartition() 算子区分开,前者是将其数据进行重分区;后者是调整分区的数量。
object RDD_Transform_partitionBy {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 partitionBy
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD:RDD[(Int, Int)] = rdd.map((_, 1))
mapRDD.partitionBy(new HashPartitioner(partitions = 2)).saveAsTextFile("output")
// Step3: 关闭环境
sc.stop()
}
}
最后的输出:
最终实现【1,3】【2,4】分组。
【说明1】HashPartitioner 继承自 Partitioner,有两个主要方法:
- numPartitions:获取分区数量;
- getPartition:获取分区号,返回一个整数值,其实就是 key 值对于分区号进行取模计算,获得分区值。
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
nonNegativeMod() 方法源码:
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
【说明2】如果 RDD 连续进行两次 partitionBy() 算子操作,partitionBy() 算子会对自身的 partitioner 对象进行比较。Scala 的 == 会进行类型和非空比较。
partithoner 自身又有 equals() 方法,比较两个分区器是否同一。比较逻辑,要看:类型、分区数量。如果同一,分区器将不做任何处理,保留self 的分区器。
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
【问题3】其他类型分区器 Partitioner 有 3 个子类。其中 PythonPartitioner (包含小锁头)说明其是在特定包下运行的分区器。RangePartitioner 多用于排序操作。
reduceByKey()
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
函数说明
可以将数据按照相同的 Key 对 Value 进行聚合。
- Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合。
- reduceByKey() 中如果 Key 的数据只有一个,则该 Key 不参与运算。
object RDD_Transform_reduceByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 reduceByKey:相同的 Key 的数据进行 value 数据的聚合操作
// Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val reduceRDD = rdd.reduceByKey(
(x: Int, y: Int) => {
println(s"x=${x}, y=${y}")
(x + y)
})
reduceRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
x=1, y=2
x=3, y=3
(a,6)
(b,4)
groupByKey()
函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
函数说明
将数据源的数据根据 key 对 value 进行分组。
object RDD_Transform_groupByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 groupByKey
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))
groupByKey() 与 groupBy()的区别:
- groupByKey():1)通过 Key 进行分组;2)返回 RDD[(String, Iterable[Int])],聚合后的结果。
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
- groupBy():1)可以使用元组中任意元素进行分组;2)返回的是分组后的元组,元组仍然是原始的元组。
val groupRDD1: RDD[(String, Iterable[(String, Int)])]=rdd.groupBy(_._1)
groupByKey() 与 reduceByKey() 的区别:
从性能上看:reduceByKey 在分区内进行预聚合(分区内做聚合),在本地将数据量进行压缩,可以使 shuffle 落盘时数据量减少,同时在 reduce 时从文件读取的数据量的大小也进行压缩,从而提高 shuffle 的效果。如果进行聚合,reduceByKey() 性能较高
从功能上看:如果只分组,那只能使用 groupByKey()。
reduceByKey() 分区内和分区间计算规则相同。
aggregateByKey()
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey() 存在参数柯里化,接受两个参数列表。
- 第一个参数列表, 需要传递一个参数:表示初始值。主要用于当我们遇到第一个 key 时,和 value 进行分区计算
- 第二个参数列表:
- 第一个参数表示分区内计算规则;
- 第二个参数表示分区件计算规则。
函数说明
将数据根据不同的规则进行分区内计算和分区间计算。
object RDD_Transform_aggregateByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// aggregateByKey() 存在参数柯里化
// 第一个参数列表, 需要传递一个参数:表示初始值
// 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
// 第二个参数列表:
// 第一个参数表示分区内计算规则
// 第二个参数表示分区件计算规则
rdd.aggregateByKey(zeroValue = 0)(
(x, y) => math.max(x,y),
(x, y) => x + y
).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(b,8)
(a,8)
修改 zeroValue 值后的效果:
object RDD_Transform_aggregateByKey_zeroValue {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// aggregateByKey() 存在参数柯里化
// 第一个参数列表, 需要传递一个参数:表示初始值
// 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
// 第二个参数列表:
// 第一个参数表示分区内计算规则
// 第二个参数表示分区件计算规则
rdd.aggregateByKey(zeroValue = 5)(
(x, y) => math.max(x,y),
(x, y) => x + y
).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果,如预期:
(b,8)
(a,8)
aggregateByKey() 方法,如果分区内、分区间使用相同的聚合函数,则效果与 reduceByKey() 相同。
rdd.aggregateByKey(zeroValue = 0)(
(x, y) => x + y,
(x, y) => x + y
)
// 简化写法:匿名函数的字典原则
rdd.aggregateByKey(zeroValue = 0)( _+_,_+_)
应用场景,按 Key 值取平均数。
object RDD_Transform_aggregateByKey_average {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// 初始值 (0,0),tuple 的第一个元素为求和,第二个值为 key 出现的次数
val averageRDD: RDD[(String,(Int, Int))] = rdd.aggregateByKey(zeroValue = (0, 0))(
(t, v) => {
// 分区内,tuple 第一个元素为值相加,第二个元素为次数相加
(t._1 + v, t._2 + 1)
},
// 分区间,tuple 中的元素,第一个元素值相加,第二个元素数量相加
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
// RDD 中,如果 Key 保持不变,只对 Value 进行处理,可以使用 mapValues() 算子
averageRDD.mapValues{
case (num, cnt) => {
num / cnt
}
}.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(b,4)
(a,3)
- 上一篇: 9 RDD 常用算子(4)(dirac算子)
- 下一篇: 英雄无敌3官图 卷土重来 菜鸟攻略
猜你喜欢
- 2025-07-09 密码被破译多可怕?被美军全程盯梢,日本海军军神命丧太平洋
- 2025-07-09 NEA半决赛巅峰较量 天禄&BOF会师决赛
- 2025-07-09 美团图灵机器学习平台性能起飞的秘密(一)
- 2025-07-09 云南大山里有一个小人国,居民身高都不到1米3,背后故事让人落泪
- 2025-07-09 宝石争霸地图解析!(宝石争霸攻略)
- 2025-07-09 如何正确使用chunkbase网站?让你在联机时领先朋友一大步
- 2025-07-09 Java对象拷贝原理剖析及最佳实践(java拷贝一个对象)
- 2025-07-09 英雄无敌3官图 卷土重来 菜鸟攻略
- 2025-07-09 9 RDD 常用算子(4)(dirac算子)
- 2025-07-09 自小编抢注丁丁网的O2O域名ddo2o.com怎么没动作了.
- 最近发表
- 标签列表
-
- location.href (44)
- document.ready (36)
- git checkout -b (34)
- 跃点数 (35)
- 阿里云镜像地址 (33)
- qt qmessagebox (36)
- mybatis plus page (35)
- vue @scroll (38)
- 堆栈区别 (33)
- 什么是容器 (33)
- sha1 md5 (33)
- navicat导出数据 (34)
- 阿里云acp考试 (33)
- 阿里云 nacos (34)
- redhat官网下载镜像 (36)
- srs服务器 (33)
- pico开发者 (33)
- https的端口号 (34)
- vscode更改主题 (35)
- 阿里云资源池 (34)
- os.path.join (33)
- redis aof rdb 区别 (33)
- 302跳转 (33)
- http method (35)
- js array splice (33)