一、Spark 1.X

spark 2.X开始,三者的关系发生了变化,可以参考《且谈Apache Spark的API三剑客:RDD、DataFrame和Dataset》 ,在2.X中DataFrame=DataSet[Row],其实是不知道类型。下面介绍是1.X,以免误导大家。

RDD、DataFrame和DataSet是容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同。

RDD和DataFrame


RDD-DataFrame

上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

提升执行效率

RDD API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运行期倾向于创建大量临时对象,对GC造成压力。在现有RDD API的基础之上,我们固然可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的开销,但这牺牲了代码的可读性,而且要求开发者对Spark运行时机制有一定的了解,门槛较高。另一方面,Spark SQL在框架内部已经在各种可能的情况下尽量重用对象,这样做虽然在内部会打破了不变性,但在将数据返回给用户时,还会重新转为不可变数据。利用 DataFrame API进行开发,可以免费地享受到这些优化效果。

减少数据读取

分析大数据,最快的方法就是 ——忽略它。这里的“忽略”并不是熟视无睹,而是根据查询条件进行恰当的剪枝。

上文讨论分区表时提到的分区剪 枝便是其中一种——当查询的过滤条件中涉及到分区列时,我们可以根据查询条件剪掉肯定不包含目标数据的分区目录,从而减少IO。

对于一些“智能”数据格 式,Spark SQL还可以根据数据文件中附带的统计信息来进行剪枝。简单来说,在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值、null值数量等 一些基本的统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查询条件要求a > 200)。

此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。

执行优化


人口数据分析示例

为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。

对于普通开发者而言,查询优化 器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。

RDD和DataSet

  • DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。

  • DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为Spark

SQl类型,然而RDD依赖于运行时反射机制。

通过上面两点,DataSet的性能比RDD的要好很多,可以参见[3]

DataFrame和DataSet

Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:

  • DataSet可以在编译时检查类型

  • 并且是面向对象的编程接口。用wordcount举例:

//DataFrame// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds  
    .flatMap(_.split(" "))// Split on whitespace
    .filter(_ !="")// Filter empty words
    .toDF()// Convert to DataFrame to perform aggregation / sorting
    .groupBy($"value")// Count number of occurences of each word
    .agg(count("*") as "numOccurances")
    .orderBy($"numOccurances" desc)// Show most common words first

//DataSet,完全使用scala编程,不要切换到DataFrame
val wordCount =   ds
.flatMap(_.split(" "))
.filter(_ !="")
.groupBy(_.toLowerCase())// Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
  • 后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。

DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame转化为DataSet,ds.toDF()这样可以把DataSet转化为DataFrame。


二、Spark 2.X

本文翻译自 A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets ,翻译已获得原作者 Jules S. Damji 的授权。

最令开发者们高兴的事莫过于有一组 API,可以大大提高开发者们的工作效率,容易使用、非常直观并且富有表现力。Apache Spark 广受开发者们欢迎的一个重要原因也在于它那些非常容易使用的 API,可以方便地通过多种语言,如 Scala、Java、Python 和 R 等来操作大数据集。

在本文中,我将深入讲讲 Apache Spark 2.2 以及以上版本提供的三种 API——RDD、DataFrame 和 Dataset,在什么情况下你该选用哪一种以及为什么,并概述它们的性能和优化点,列举那些应该使用 DataFrame 和 Dataset 而不是 RDD 的场景。我会更多地关注 DataFrame 和 Dataset,因为在 Apache Spark 2.0 中这两种 API 被整合起来了。

这次整合背后的动机在于我们希望可以让使用 Spark 变得更简单,方法就是减少你需要掌握的概念的数量,以及提供处理结构化数据的办法。在处理结构化数据时,Spark 可以像针对特定领域的语言所提供的能力一样,提供高级抽象和 API。

弹性分布式数据集(Resilient Distributed Dataset,RDD)

从一开始 RDD 就是 Spark 提供的面向用户的主要 API。从根本上来说,一个 RDD 就是你的数据的一个不可变的分布式元素集合,在集群中跨节点分布,可以通过若干提供了转换和处理的底层 API 进行并行处理。

在什么情况下使用 RDD?

下面是使用 RDD 的场景和常见案例:

  • 你希望可以对你的数据集进行最基本的转换、处理和控制;

  • 你的数据是非结构化的,比如流媒体或者字符流;

  • 你想通过函数式编程而不是特定领域内的表达来处理你的数据;

  • 你不希望像进行列式处理一样定义一个模式,通过名字或字段来处理或访问数据属性;

  • 你并不在意通过 DataFrame 和 Dataset 进行结构化和半结构化数据处理所能获得的一些优化和性能上的好处;

Apache Spark 2.0 中的 RDD 有哪些改变?

可能你会问:RDD 是不是快要降级成二等公民了?是不是快要退出历史舞台了?

答案是非常坚决的:不!

而且,接下来你还将了解到,你可以通过简单的 API 方法调用在 DataFrame 或 Dataset 与 RDD 之间进行无缝切换,事实上 DataFrame 和 Dataset 也正是基于 RDD 提供的。

DataFrame

与 RDD 相似, DataFrame 也是数据的一个不可变分布式集合。但与 RDD 不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样。设计 DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。它提供了特定领域内专用的 API 来处理你的分布式数据,并让更多的人可以更方便地使用 Spark,而不仅限于专业的数据工程师。

在我们的 Apache Spark 2.0 网络研讨会以及后续的博客中,我们提到在Spark 2.0 中,DataFrame 和 Dataset 的 API 将融合到一起,完成跨函数库的数据处理能力的整合。在整合完成之后,开发者们就不必再去学习或者记忆那么多的概念了,可以通过一套名为 Dataset 的高级并且类型安全的 API 完成工作。


Dataset

如下面的表格所示,从 Spark 2.0 开始,Dataset 开始具有两种不同类型的 API 特征:有明确类型的 API 和无类型的 API。从概念上来说,你可以把 DataFrame 当作一些通用对象 Dataset[Row] 的集合的一个别名,而一行就是一个通用的无类型的 JVM 对象。与之形成对比,Dataset 就是一些有明确类型定义的 JVM 对象的集合,通过你在 Scala 中定义的 Case Class 或者 Java 中的 Class 来指定。

有类型和无类型的 API

语言 主要抽象
Scala Dataset[T] & DataFrame (Dataset[Row] 的别名)
Java Dataset[T]
Python DataFrame
R DataFrame

注意:因为 Python 和 R 没有编译时类型安全,所以我们只有称之为 DataFrame 的无类型 API。

Dataset API 的优点

在 Spark 2.0 里,DataFrame 和 Dataset 的统一 API 会为 Spark 开发者们带来许多方面的好处。

1、静态类型与运行时类型安全

从 SQL 的最小约束到 Dataset 的最严格约束,把静态类型和运行时安全想像成一个图谱。比如,如果你用的是 Spark SQL 的查询语句,要直到运行时你才会发现有语法错误(这样做代价很大),而如果你用的是 DataFrame 和 Dataset,你在编译时就可以捕获错误(这样就节省了开发者的时间和整体代价)。也就是说,当你在 DataFrame 中调用了 API 之外的函数时,编译器就可以发现这个错。不过,如果你使用了一个不存在的字段名字,那就要到运行时才能发现错误了。

图谱的另一端是最严格的 Dataset。因为 Dataset API 都是用 lambda 函数和 JVM 类型对象表示的,所有不匹配的类型参数都可以在编译时发现。而且在使用 Dataset 时,你的分析错误也会在编译时被发现,这样就节省了开发者的时间和代价。

所有这些最终都被解释成关于类型安全的图谱,内容就是你的 Spark 代码里的语法和分析错误。在图谱中,Dataset 是最严格的一端,但对于开发者来说也是效率最高的。


2、关于结构化和半结构化数据的高级抽象和定制视图

把 DataFrame 当成 Dataset[Row] 的集合,就可以对你的半结构化数据有了一个结构化的定制视图。比如,假如你有个非常大量的用 JSON 格式表示的物联网设备事件数据集。因为 JSON 是半结构化的格式,那它就非常适合采用 Dataset 来作为强类型化的 Dataset[DeviceIoTData] 的集合。

{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", 
"ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", 
"latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", 
"temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, 
"lcd": "red", "timestamp" :1458081226051}

你可以用一个 Scala Case Class 来把每条 JSON 记录都表示为一条 DeviceIoTData,一个定制化的对象。

case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, 
device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, 
longitude: Double, scale:String, temp: Long, timestamp: Long)

接下来,我们就可以从一个 JSON 文件中读入数据。

// read the json file and create the dataset from the 
// case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json(“/databricks-public-datasets/data/iot/iot_devices.json”).as[DeviceIoTData]

上面的代码其实可以细分为三步:

  1. Spark 读入 JSON,根据模式创建出一个 DataFrame 的集合;

  2. 在这时候,Spark 把你的数据用“DataFrame = Dataset[Row]”进行转换,变成一种通用行对象的集合,因为这时候它还不知道具体的类型;

  3. 然后,Spark 就可以按照类 DeviceIoTData 的定义,转换出“Dataset[Row] -> Dataset[DeviceIoTData]”这样特定类型的 Scala JVM 对象了。

许多和结构化数据打过交道的人都习惯于用列的模式查看和处理数据,或者访问对象中的某个特定属性。将 Dataset 作为一个有类型的 Dataset[ElementType] 对象的集合,你就可以非常自然地又得到编译时安全的特性,又为强类型的 JVM 对象获得定制的视图。而且你用上面的代码获得的强类型的 Dataset[T] 也可以非常容易地用高级方法展示或处理。


3、方便易用的结构化 API

虽然结构化可能会限制你的 Spark 程序对数据的控制,但它却提供了丰富的语义,和方便易用的特定领域内的操作,后者可以被表示为高级结构。事实上,用 Dataset 的高级 API 可以完成大多数的计算。比如,它比用 RDD 数据行的数据字段进行 agg、select、sum、avg、map、filter 或 groupBy 等操作简单得多,只需要处理 Dataset 类型的 DeviceIoTData 对象即可。

用一套特定领域内的 API 来表达你的算法,比用 RDD 来进行关系代数运算简单得多。比如,下面的代码将用 filter() 和 map() 来创建另一个不可变 Dataset。

// Use filter(), map(), groupBy() country, and compute avg() 
// for temperatures and humidity. This operation results in 
// another immutable Dataset. The query is simpler to read, 
// and expressive
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3))
    .groupBy($"_3").avg()//display the resulting datasetdisplay(dsAvgTmp)


4、性能与优化

除了上述优点之外,你还要看到使用 DataFrame 和 Dataset API 带来的空间效率和性能提升。原因有如下两点:

首先,因为 DataFrame 和 Dataset API 都是基于 Spark SQL 引擎构建的,它使用 Catalyst 来生成优化后的逻辑和物理查询计划。所有 R、Java、Scala 或 Python 的 DataFrame/Dataset API,所有的关系型查询的底层使用的都是相同的代码优化器,因而会获得空间和速度上的效率。尽管有类型的 Dataset[T] API 是对数据处理任务优化过的,无类型的 Dataset[Row](别名 DataFrame)却运行得更快,适合交互式分析。


其次, Spark 作为一个编译器,它可以理解 Dataset 类型的 JVM 对象,它会使用编码器来把特定类型的JVM 对象映射成Tungsten 的内部内存表示。结果,Tungsten 的编码器就可以非常高效地将JVM 对象序列化或反序列化,同时生成压缩字节码,这样执行效率就非常高了。

该什么时候使用 DataFrame 或 Dataset 呢?

  • 如果你需要丰富的语义、高级抽象和特定领域专用的 API,那就使用 DataFrame 或 Dataset;

  • 如果你的处理需要对半结构化数据进行高级处理,如 filter、map、aggregation、average、sum、SQL 查询、列式访问或使用 lambda 函数,那就使用 DataFrame 或 Dataset;

  • 如果你想在编译时就有高度的类型安全,想要有类型的 JVM 对象,用上 Catalyst 优化,并得益于 Tungsten 生成的高效代码,那就使用 Dataset;

  • 如果你想在不同的 Spark 库之间使用一致和简化的 API,那就使用 DataFrame 或 Dataset;

  • 如果你是 R 语言使用者,就用 DataFrame;

  • 如果你是 Python 语言使用者,就用 DataFrame,在需要更细致的控制时就退回去使用 RDD;

注意只需要简单地调用一下.rdd,就可以无缝地将 DataFrame 或 Dataset 转换成 RDD。例子如下:

// select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows
val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level")
    .where($"c02_level" > 1300)
// convert to RDDs and take the first 10 rows
val eventsRDD = deviceEventsDS.rdd.take(10)


总结

总之,在什么时候该选用 RDD、DataFrame 或 Dataset 看起来好像挺明显。前者可以提供底层的功能和控制,后者支持定制的视图和结构,可以提供高级和特定领域的操作,节约空间并快速运行。

当我们回顾从早期版本的 Spark 中获得的经验教训时,我们问自己该如何为开发者简化 Spark 呢?该如何优化它,让它性能更高呢?我们决定把底层的 RDD API 进行高级抽象,成为 DataFrame 和 Dataset,用它们在 Catalyst 优化器和 Tungsten 之上构建跨库的一致数据抽象。

DataFrame 和 Dataset,或 RDD API,按你的实际需要和场景选一个来用吧,当你像大多数开发者一样对数据进行结构化或半结构化的处理时,我不会有丝毫惊讶。

三、参考

且谈 Apache Spark 的 API 三剑客:RDD、DataFrame 和 Dataset

Spark SQL结构化分析

解读2015之Spark篇:新生态系统的形成

Introducing Spark Datasets

databricks example

Original Link: http://ibillxia.github.io/blog/2019/09/17/spark-rdd-dataframe-dataset/
Attribution - NON-Commercial - ShareAlike - Copyright © Bill Xia