Spark RDD

Spark RDD

简介

Spark 的设计围绕弹性分布式数据集(RDD) 的概念展开,RDD 是可以并行操作的元素的容错集合。 它是一个不可变的分布式对象集合。 RDD 中的每个数据集都被划分为逻辑分区,可以在集群的不同节点上进行计算。 RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。 有两种方法可以创建 RDD:并行化 驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

在高层次上,每个 Spark 应用程序都包含一个驱动程序(driver program),该驱动程序运行用户的 main 函数并在集群上执行各种并行操作。 Spark 提供的主要抽象是弹性分布式数据集(RDD resilient distributed dataset),它是跨集群节点分区的元素集合,可以并行操作。 RDD 是通过从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件或驱动程序中现有的 Scala 集合开始并对其进行转换来创建的。 用户还可以要求 Spark 将 RDD 持久化到内存中,以便在并行操作中有效地重用它。最后,RDD 会自动从节点故障中恢复。

Spark 中除了 RDD 之外还有一个抽象即:可以在并行操作中使用的共享变量。 默认情况下,当 Spark 在不同节点上并行运行一个函数作为一组任务时,它会将函数中使用的每个变量的副本发送到每个任务。 有时,需要在任务之间或在任务和驱动程序之间共享变量。 Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存值,以及累加器,它们是仅“添加”到的变量,例如计数器和求和计算。

操作类型

RDD 支持两种类型的操作:

  • transformations(从现有数据集创建新数据集)
  • actions(在对数据集运行计算后将值返回给驱动程序)

例如,map 是一种通过函数传递每个数据集元素并返回表示结果的新 RDD 的转换。另一方面,reduce 是使用某个函数聚合 RDD 的所有元素并将最终结果返回给驱动程序的操作(尽管也有 reduceByKey 返回分布式数据集的并行操作)。 Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。 相反,他们只记得应用于某些基础数据集(例如文件)的转换。仅当操作需要将结果返回给驱动程序时才计算转换。 这种设计使 Spark 能够更高效地运行。例如,我们可以意识到通过创建的数据集map将在 a 中使用,reduce并且仅将结果返回reduce给驱动程序,而不是更大的映射数据集。

默认情况下,每个转换后的 RDD 可能会在您每次对其运行操作时重新计算。但是,您也可以使用(or) 方法将 RDD 持久化到内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问它。还支持在磁盘上持久化 RDD,或跨多个节点复制。

Shuffle

Spark 中的某些操作会触发一个称为 shuffle 的事件。shuffle 是 Spark 用于重新分配数据的机制,以便跨分区以不同方式分组。这通常涉及跨执行器和机器复制数据,使 Shuffle 成为一项复杂且成本高昂的操作。

要了解在 shuffle 期间发生了什么,我们可以考虑 reduceByKey 操作。reduceByKey 操作会生成一个新的 RDD,返回其中单个键的所有值都组合成一个元组-键和针对与该键关联的所有值执行 reduce 函数的结果。 挑战在于,并非单个键的所有值都必须位于同一分区甚至同一台机器上,但它们必须位于同一位置才能得出计算结果。

在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个 reduceByKey 的 reduce 任务执行的所有数据,Spark 需要执行 all-to-all 操作。 它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值组合在一起以计算每个键的最终结果 - 这称为 shuffle。

尽管新混洗数据的每个分区中的元素集合是确定性的,分区本身的顺序也是确定性的,但这些元素的顺序不是。如果希望在 shuffle 之后可预测有序的数据,那么可以使用:

  • mapPartitions 对每个分区进行排序,例如,.sorted
  • repartitionAndSortWithinPartitions 在重新分区的同时有效地对分区进行排序
  • sortBy 制作一个全局有序的 RDD

可能导致洗牌的操作包括重新分区操作,如 repartition 和 coalesce,'ByKey操作(计数除外),如 groupByKey 和 reduceByKey,以及 join 操作,如 cogroup 和 join。

持久性

Spark 中最重要的功能之一是跨操作将数据集持久化(或缓存)在内存中。 当你持久化一个 RDD 时,每个节点都会将它计算的任何分区存储在内存中,并在对该数据集(或从它派生的数据集)的其他操作中重用它们。 这使得未来的行动更快(通常超过 10 倍)。缓存是迭代算法和快速交互使用的关键工具。

可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次计算时,它将保存在节点的内存中。 Spark 的缓存是容错的——如果 RDD 的任何分区产生了丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的 RDD 可以使用不同的存储级别存储,例如,允许您将数据集持久化到磁盘上,将其持久化在内存中,但它会作为序列化的 Java 对象(以节省空间和跨节点复制)。 这些级别是通过将 StorageLevel 对象传递给 persist()。该 cache() 方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)。

共享变量

通常,当传递给 Spark 操作的函数(例如map或reduce)在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量效率低下。 然而,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

广播变量

广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。 例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 动作通过一组阶段执行,由分布式 “shuffle” 操作隔离。Spark 自动广播每个阶段内任务所需的公共数据。 以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。 这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

可以通过 sc.broadcast() 方法构建广播变量。

累加器

累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地支持并行操作。它们可用于实现计数器(如在 MapReduce 中)或求和。Spark 原生支持数值类型的累加器,如果需要特定类型的累加器需要自行实现。

可以通过分别调用 SparkContext.longAccumulator()SparkContext.doubleAccumulator() 累加 Long 或 Double 类型的值来创建数值累加器。 然后可以使用该 add 方法将在集群上运行的任务添加到其中。但是,程序无法读取其值。只有驱动程序(Driver)可以使用其 value 方法读取累加器的值。

使用

Spark 官方已经不建议采用 RDD 这样的低级 API 了,如有需求应该选择 DataSet 或 DataFrame 这样的高级 API。

参考资料

官方文档


Spark RDD
https://wangqian0306.github.io/2022/rdd/
作者
WangQian
发布于
2022年7月13日
许可协议