5.2 RDD 的创建
在 Spark 中创建 RDD 的方式可以分为 3 种:
- 从集合中创建 RDD
- 从外部存储创建 RDD
- 从其他 RDD 转换得到新的 RDD。
5.2.1 从集合中创建 RDD
1. 使用parallelize
函数创建
scala> val arr = Array(10,20,30,40,50,60)
arr: Array[Int] = Array(10, 20, 30, 40, 50, 60)
scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
2. 使用makeRDD
函数创建
makeRDD
和parallelize
是一样的.
scala> val rdd1 = sc.makeRDD(Array(10,20,30,40,50,60))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
说明:
一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了.
parallelize
和makeRDD
还有一个重要的参数就是把数据集切分成的分区数.Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数
5.2.2 从外部存储创建 RDD
Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集.
可以是本地文件系统, HDFS, Cassandra, HVase, Amazon S3 等等.
Spark 支持 文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat.
scala> var distFile = sc.textFile("words.txt")
distFile: org.apache.spark.rdd.RDD[String] = words.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> distFile.collect
res0: Array[String] = Array(atguigu hello, hello world, how are you, abc efg)
说明:
url
可以是本地文件系统文件,hdfs://...
,s3n://...
等等如果是使用的
本地文件系统的路径, 则必须每个节点都要存在这个路径 所有基于文件的方法, 都支持目录, 压缩文件, 和通配符(
*
). 例如:textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
textFile
还可以有第二个参数, 表示分区数. 默认情况下, 每个块对应一个分区.(对 HDFS 来说, 块大小默认是128M
). 可以传递一个大于块数的分区数, 但是不能传递一个比块数小的分区数.关于读取文件和保存文件的其他知识, 后面专门的章节介绍.
5.2.3 从其他 RDD 转换得到新的 RDD
就是通过 RDD 的各种转换算子来得到新的 RDD.
详见 5.3 节