作为大数据离线处理的代表性框架,Spark发展至今,依然在市场上占据着重要的地位,而在大数据学习当中,围绕Spark框架,也需要投入相当的时间和精力。今天的大数据学习分享,我们主要来讲讲Spark框架当中的Spark Streaming编程。
Spark Streaming编程步骤
Spark Streaming编程一般包括一下几个步骤:
1.创建StreamingContext
2.创建输入DStream来定义输入源
3.通过对DStream应用转换操作和输出操作来定义处理逻辑
4.用streamingContext.start()来开始接收数据和处理流程
5.streamingContext.awaitTermination()方法来等待处理结束
object StartSparkStreaming {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Streaming")
// 1.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
// 2.创建DStream
val lines = ssc.socketTextStream("localhost", 9999)
// 3.定义流计算处理逻辑
val count = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
// 4.输出结果
count.print()
// 5.启动
ssc.start()
// 6.等待执行
ssc.awaitTermination()
}
}
Transformations与Output Operations
DStream是不可变的, 这意味着不能直接改变它们的内容,而是通过对DStream进行一系列转换(Transformation)来实现预期的应用程序逻辑。每次转换都会创建一个新的DStream,该DStream表示来自父DStream的转换后的数据。DStream转换是惰性(lazy)的,这意味只有执行output操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation。
Transformations
Spark Streaming提供了丰富的transformation操作,这些transformation又分为了有状态的transformation和无状态的transformation。除此之外,Spark Streaming也提供了一些window操作,值得注意的是window操作也是有状态的。具体细节如下:
无状态的transformation
无状态的transformation是指每一个micro-batch的处理是相互独立的,即当前的计算结果不受之前计算结果的影响,Spark Streaming的大部分算子都是无状态的,比如常见的map(),flatMap(),reduceByKey()等等。
map(func)
对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
flatMap(func)
与map相似,但是每个输入项可用被映射为0个或者多个输出项
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
filter(func)
返回一个新的DStream,仅包含源DStream中满足函数func的项
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
new FilteredDStream(this, context.sparkContext.clean(filterFunc))
}
repartition(numPartitions)
通过创建更多或者更少的分区改变DStream的并行程度
/**
* Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
* returned DStream has exactly numPartitions partitions.
*/
def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
this.transform(_.repartition(numPartitions))
}
reduce(func)
利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream
/**
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
}
count()
统计源DStream中每个RDD的元素数量
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = ssc.withScope {
this.map(_ => (null, 1L))
.transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
.reduceByKey(_ + _)
.map(_._2)
}
union(otherStream)
返回一个新的DStream,包含源DStream和其他DStream的元素
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = ssc.withScope {
new UnionDStream[T](Array(this, that))
}
countByValue()
应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数,比如lines.flatMap(_.split(" ")).countByValue().print(),对于输入:spark spark flink,将输出:(spark,2),(flink,1),即按照元素值进行分组,然后统计每个分组的元素个数。
从源码可以看出:底层实现为map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按当前的元素映射为一个tuple,其中key即为当前元素的值,然后再按照key做汇总。
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate
* the RDDs with `numPartitions` partitions (Spark's default number of partitions if
* `numPartitions` not specified).
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
reduceByKey(func, [numTasks])
当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来
比如:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()
对于输入:spark spark flink,将输出:(spark,2),(flink,1)
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative and commutative reduce function. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner())
}
join(otherStream, [numTasks])
当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
join[W](other, defaultPartitioner())
}
cogroup(otherStream, [numTasks])
当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组
// 输入:spark
// 输出:(spark,(CompactBuffer(1),CompactBuffer(1)))
val DS1 = lines.flatMap(_.split(" ")).map((_,1))
val DS2 = lines.flatMap(_.split(" ")).map((_,1))
DS1.cogroup(DS2).print()
/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W: ClassTag](
other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
cogroup(other, defaultPartitioner())
}
transform(func)
通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作
// 输入:spark spark flink
// 输出:(spark,2)、(flink,1)
val lines = ssc.socketTextStream("localhost", 9999)
val resultDStream = lines.transform(rdd => {
rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
})
resultDStream.print()
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], _: Time) => cleanedF(r))
}
有状态的transformation
有状态的transformation是指每个micro-batch的处理不是相互独立的,即当前的micro-batch处理依赖于之前的micro-batch计算结果。常见的有状态的transformation主要有countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey等等。其实所有的基于window的操作都是有状态的,因为追踪整个窗口内的数据。
Output Operations
使用Output operations可以将DStream写入多外部存储设备或打印到控制台。上文提到,Spark Streaming的transformation是lazy的,因此需要Output Operation进行触发计算,其功能类似于RDD的action操作。
关于大数据学习,Spark Streaming编程入门示例,以上就为大家做了基本的介绍了。Spark Streaming作为Spark流计算的重要组件,是学习当中的重点部分,多理解多练习才能更快掌握。成都加米谷大数据,专业
大数据培训机构,大数据开发,数据分析与挖掘,零基础班本月正在招生中,课程大纲及试听课程可联系客服获取!