云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

9 RDD 常用算子(4)(dirac算子)

jxf315 2025-07-09 13:38:59 教程文章 5 ℃

接上一期,key-value 类型算子。

foldByKey()

函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

函数说明

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。

object RDD_Transform_foldByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 foldByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
    rdd.foldByKey(zeroValue = 0)(_+_).collect.foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

combineByKey()

函数签名

def combineByKey[C](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C): RDD[(K, C)]

算子有三个参数:

  • 第一个参数:相同 Key 第一个数据进行数据转换,实现操作;
  • 第二个参数:分区内计算规则;
  • 第三个参数:分区间计算规则。

函数说明

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

object RDD_Transform_combineByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 combineByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
    val combineRDD = rdd.combineByKey(
      v => (v, 1),
      // 由于 v 的类型在运行时才能确定,因此此处要指定数据类型
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    combineRDD.mapValues{
      case (num, cnt) => {
        num / cnt
      }
    }.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(b,4)
(a,3)

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

样例代码:

object RDD_Transform_aggregateByKey_synthesize {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 聚合算子综合
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
    rdd.reduceByKey(_+_)  // 计算 wordCount
    rdd.aggregateByKey(zeroValue = 0)(_+_, _+_) // 计算 wordCount
    rdd.foldByKey(zeroValue = 0)(_+_) // 计算 wordCount
    rdd.combineByKey(v=>v,(x:Int, y)=>x+y,(x:Int,y:Int)=>x+y)// 计算 wordCount
    // Step3: 关闭环境
    sc.stop()
  }
}

源码分析:

reduceByKey:
combineByKeyWithClassTag[V](
    (v: V) => v, // 第一个值不会参与计算
    func, // 分区间数据处理函数(分区内、分区间规则相同)
    func, // 分区间数据处理函数(分区内、分区间规则相同)
    partitioner)

aggregateByKey:
combineByKeyWithClassTag[U](
    (v: V) => cleanedSeqOp(createZero(), v),// 初始值和第一个value的数据进行的分区内处理
    cleanedSeqOp, // 分区内数据处理函数(分区内、分区间规则不同,cleanedSeqOp/combOp)
    combOp, // 分区间数据处理函数(分区内、分区间规则不同,cleanedSeqOp/combOp)
    partitioner)

foldByKey:
combineByKeyWithClassTag[V](
    (v: V) => cleanedFunc(createZero(), v),// 初始值和第一个value的数据进行的分区内处理
    cleanedFunc,  // 分区内数据处理函数(分区内、分区间规则相同)
    cleanedFunc,  // 分区间数据处理函数(分区内、分区间规则相同)
    partitioner)

combineByKey:
combineByKeyWithClassTag(
    createCombiner,  // 相同 Key 第一条数据所做的处理
    mergeValue,      // 分区内数据处理函数
    mergeCombiners,  // 分区间数据处理函数
    partitioner, mapSideCombine, serializer)(null)

sortByKey()

函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]

函数说明

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的。

object RDD_Transform_sortByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 sortByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("b",3),("b",4),("a",1),("a",2),("b",5),("a",6)),numSlices = 2)

    rdd.sortBy(_._1).collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

返回结果:

(a,1)
(a,2)
(a,6)
(b,3)
(b,4)
(b,5)

join()

函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD。

  • 将两个不同的数据集相同的 Key 的 value 连接在一起,形成元组。
object RDD_Transform_join {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 join
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd2 = sc.makeRDD(List(("b", 4), ("c", 5), ("a", 6)))

    val joinRDD = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(1,6))
(b,(2,4))
(c,(3,5))
  • 如果两个数据源中 Key 没有匹配上,那么数据就不会出现在结果中。下例中,rdd1 中的 ("b",2) 元组和 rdd2 中的 ("d", 4) 元组都没有匹配到相同 Key 值的数据,因此在结果中都不予显示。
object RDD_Transform_join_notMatch {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 join
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd2 = sc.makeRDD(List(("d", 4), ("c", 5), ("a", 6)))

    val joinRDD = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(1,6))
(c,(3,5))
  • 当出现多个 Key 值匹配,会一次匹配,可能会出现笛卡尔积,数据量会几何级增长。
object RDD_Transform_join_Cartesian {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 join
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    val rdd2 = sc.makeRDD(List(("a", 4), ("c", 5), ("a", 6),("b",7)))

    val joinRDD = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(1,4))
(a,(1,6))
(b,(2,7))
(c,(3,5))

leftOuterJoin()

函数签名

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

函数说明

类似于 SQL 语句的左外连接。

以下例子,以 rdd1 为主表进行关联,如果没有匹配上会使用一个 None 填充后形成元组。

object RDD_Transform_leftOuterJoin {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 leftOuterJoin
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
    val rdd2 = sc.makeRDD(List(("c", 5), ("a", 6)))

    val joinRDD = rdd1.leftOuterJoin(rdd2)
    joinRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(1,Some(6)))
(b,(2,None))
(c,(3,Some(5)))
(d,(4,None))

同样可以使用 rightOuterJoin() 算子进行右外连接,令 rdd2 作为主表。

object RDD_Transform_rightOuterJoin {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 rightOuterJoin
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("d",4)))
    val rdd2 = sc.makeRDD(List(("c", 5), ("a", 6)))

    val joinRDD = rdd1.rightOuterJoin(rdd2)
    joinRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(Some(1),6))
(c,(Some(3),5))

cogroup()

函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD。相同的 Key 放在一个组中,连接在一起。

  • 相同数据源的数据先进行分组,再与其他数据源进行连接。如下例,rdd1 中没有 key 为 c 的数据,因此在连接时,key=c 的数据为空。
object RDD_Transform_cogroup {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 cogroup
    val rdd1 = sc.makeRDD(List(("a",1),("b",2)))
    val rdd2 = sc.makeRDD(List(("b", 5), ("a", 6),("c",4)))

    val cgRDD:RDD[(String,(Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2)
    cgRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(CompactBuffer(1),CompactBuffer(6)))
(b,(CompactBuffer(2),CompactBuffer(3)))
(c,(CompactBuffer(),CompactBuffer(5)))
  • 相同数据源的数据先进行分组,再与其他数据源进行连接。如下例,同一分区内存在相同 Key 值的数据,先进行分组,之后再进行连接。
object RDD_Transform_cogroup2 {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 cogroup
    val rdd1 = sc.makeRDD(List(("a",1),("b",2)))
    val rdd2 = sc.makeRDD(List(("b", 5), ("a", 6),("c",4),("c",3)))

    val cgRDD:RDD[(String,(Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2)
    cgRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,(CompactBuffer(1),CompactBuffer(6)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(),CompactBuffer(4, 3)))
  • cogroup() 算子最多支持 3 个其他 RDD 连接。

到此,RDD 转换算子已经总结完成,下一期会介绍 RDD 行动算子。

Tags:

最近发表
标签列表