5.2 RDD 的创建
在 Spark 中创建 RDD 的方式可以分为 3 种:
- 从集合中创建 RDD
- 从外部存储创建 RDD
- 从其他 RDD 转换得到新的 RDD。
5.2.1 从集合中创建 RDD
1. 使用parallelize函数创建
scala> val arr = Array(10,20,30,40,50,60)
arr: Array[Int] = Array(10, 20, 30, 40, 50, 60)
scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
2. 使用makeRDD函数创建
makeRDD和parallelize是一样的.
scala> val rdd1 = sc.makeRDD(Array(10,20,30,40,50,60))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
说明:
一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了.
parallelize和makeRDD还有一个重要的参数就是把数据集切分成的分区数.Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数
5.2.2 从外部存储创建 RDD
Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集.
可以是本地文件系统, HDFS, Cassandra, HVase, Amazon S3 等等.
Spark 支持 文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat.
scala> var distFile = sc.textFile("words.txt")
distFile: org.apache.spark.rdd.RDD[String] = words.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> distFile.collect
res0: Array[String] = Array(atguigu hello, hello world, how are you, abc efg)
说明:
url可以是本地文件系统文件,hdfs://...,s3n://...等等如果是使用的
本地文件系统的路径, 则必须每个节点都要存在这个路径 所有基于文件的方法, 都支持目录, 压缩文件, 和通配符(
*). 例如:textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").textFile还可以有第二个参数, 表示分区数. 默认情况下, 每个块对应一个分区.(对 HDFS 来说, 块大小默认是128M). 可以传递一个大于块数的分区数, 但是不能传递一个比块数小的分区数.关于读取文件和保存文件的其他知识, 后面专门的章节介绍.
5.2.3 从其他 RDD 转换得到新的 RDD
就是通过 RDD 的各种转换算子来得到新的 RDD.
详见 5.3 节