主页 > 新闻资讯 > 大数据学习:如何创建Spark RDD

大数据学习:如何创建Spark RDD

作者:张老师 浏览次数: 2020-12-15 16:44
对Spark稍有了解的人都知道,Spark核心的数据结构,是弹性分布式数据集RDD,RDD作为Spark对数据的核心抽象,在编程任务当中,往往需要用到。今天的大数据学习分享,我们就主要来讲讲,如何创建Spark RDD。

事实上,RDD是 Spark 对数据的核心抽象,也是最关键的抽象,它实质上是一组分布式的 JVM 不可变对象集合,不可变决定了它是只读的,所以 RDD 在经过变换产生新的 RDD 时,原有 RDD 不会改变。

大数据学习:如何创建Spark RDD

Spark编程可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:

通过并行集合创建RDD;

从HDFS中加载数据创建RDD;

从linux本地文件系统加载数据创建RDD。

1、通过并行集合创建RDD

这种 RDD 纯粹是为了学习,将内存中的集合变量转换为 RDD,没太大实际意义。

//val spark: SparkSession = .......

val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))

从HDFS中加载数据创建RDD

这种生成 RDD 的方式是非常常用的:

//val spark: SparkSession = .......

val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")

2、从HDFS中加载数据创建RDD

Spark 从 MySQL 中读取数据返回的 RDD 类型是 JdbcRDD,顾名思义,是基于 JDBC 读取数据的,这点与 Sqoop 是相似的,但不同的是 JdbcRDD 必须手动指定数据的上下界,也就是以 MySQL 表某一列的最值作为切分分区的依据。

//val spark: SparkSession = .......

val lowerBound = 1

val upperBound = 1000

val numPartition = 10

val rdd = new JdbcRDD(spark.sparkcontext,() => {

       Class.forName("com.mysql.jdbc.Driver").newInstance()

       DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")

   },

   "SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",

   lowerBound,

   upperBound,

   numPartition,

   r => r.getString(1)

)

既然是基于 JDBC 进行读取,那么所有支持 JDBC 的数据库都可以通过这种方式进行读取,也包括支持 JDBC 的分布式数据库。

上面介绍的是通过 JDBC 读取数据库的方式,对于 HBase 这种分布式数据库来说,情况有些不同,HBase 这种分布式数据库,在数据存储时也采用了分区的思想,HBase 的分区名为 Region,那么基于 Region 进行导入这种方式的性能就会比上面那种方式快很多,是真正的并行导入。

//val spark: SparkSession = .......

val sc = spark.sparkcontext

val tablename = "your_hbasetable"

val conf = HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")

conf.set("hbase.zookeeper.property.clientPort", "2181")

conf.set(TableInputFormat.INPUT_TABLE, tablename)

val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

// 利用HBase API解析出行键与列值

rdd_three.foreach{case (_,result) => {

    val rowkey = Bytes.toString(result.getRow)

    val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))

}

值得一提的是 HBase 有一个第三方组件叫 Phoenix,可以让 HBase 支持 SQL 和 JDBC,在这个组件的配合下,第一种方式也可以用来抽取 HBase 的数据,此外,Spark 也可以读取 HBase 的底层文件 HFile,从而直接绕过 HBase 读取数据。

通过第三方库的支持,Spark 几乎能够读取所有的数据源,例如 Elasticsearch,所以你如果要尝试的话,尽量选用 Maven 来管理依赖。

关于大数据学习,如何创建Spark RDD,以上就为大家做了详细的介绍了。Spark RDD在进行编程的基础,而RDD的创建,有多种方式,具体还是根据自己的需要来选择。成都加米谷大数据,专业大数据培训机构,大数据开发、数据分析与挖掘,零基础班本月正在招生中,课程大纲及试学视频,可联系客服获取!
热点排行
推荐文章
立即申请>>