对Spark稍有了解的人都知道,Spark核心的数据结构,是弹性分布式数据集RDD,RDD作为Spark对数据的核心抽象,在编程任务当中,往往需要用到。今天的大数据学习分享,我们就主要来讲讲,如何创建Spark RDD。
事实上,RDD是 Spark 对数据的核心抽象,也是最关键的抽象,它实质上是一组分布式的 JVM 不可变对象集合,不可变决定了它是只读的,所以 RDD 在经过变换产生新的 RDD 时,原有 RDD 不会改变。
Spark编程可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:
通过并行集合创建RDD;
从HDFS中加载数据创建RDD;
从linux本地文件系统加载数据创建RDD。
1、通过并行集合创建RDD
这种 RDD 纯粹是为了学习,将内存中的集合变量转换为 RDD,没太大实际意义。
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))
从HDFS中加载数据创建RDD
这种生成 RDD 的方式是非常常用的:
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")
2、从HDFS中加载数据创建RDD
Spark 从 MySQL 中读取数据返回的 RDD 类型是 JdbcRDD,顾名思义,是基于 JDBC 读取数据的,这点与 Sqoop 是相似的,但不同的是 JdbcRDD 必须手动指定数据的上下界,也就是以 MySQL 表某一列的最值作为切分分区的依据。
//val spark: SparkSession = .......
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd = new JdbcRDD(spark.sparkcontext,() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
lowerBound,
upperBound,
numPartition,
r => r.getString(1)
)
既然是基于 JDBC 进行读取,那么所有支持 JDBC 的数据库都可以通过这种方式进行读取,也包括支持 JDBC 的分布式数据库。
上面介绍的是通过 JDBC 读取数据库的方式,对于 HBase 这种分布式数据库来说,情况有些不同,HBase 这种分布式数据库,在数据存储时也采用了分区的思想,HBase 的分区名为 Region,那么基于 Region 进行导入这种方式的性能就会比上面那种方式快很多,是真正的并行导入。
//val spark: SparkSession = .......
val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// 利用HBase API解析出行键与列值
rdd_three.foreach{case (_,result) => {
val rowkey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}
值得一提的是 HBase 有一个第三方组件叫 Phoenix,可以让 HBase 支持 SQL 和 JDBC,在这个组件的配合下,第一种方式也可以用来抽取 HBase 的数据,此外,Spark 也可以读取 HBase 的底层文件 HFile,从而直接绕过 HBase 读取数据。
通过第三方库的支持,Spark 几乎能够读取所有的数据源,例如 Elasticsearch,所以你如果要尝试的话,尽量选用 Maven 来管理依赖。
关于大数据学习,如何创建Spark RDD,以上就为大家做了详细的介绍了。Spark RDD在进行编程的基础,而RDD的创建,有多种方式,具体还是根据自己的需要来选择。成都加米谷大数据,专业
大数据培训机构,大数据开发、数据分析与挖掘,零基础班本月正在招生中,课程大纲及试学视频,可联系客服获取!