主页 > 新闻资讯 > 大数据学习:Spark Streaming编程入门示例

大数据学习:Spark Streaming编程入门示例

作者:张老师 浏览次数: 2021-04-21 17:17
作为大数据离线处理的代表性框架,Spark发展至今,依然在市场上占据着重要的地位,而在大数据学习当中,围绕Spark框架,也需要投入相当的时间和精力。今天的大数据学习分享,我们主要来讲讲Spark框架当中的Spark Streaming编程。

大数据学习:Spark Streaming编程入门示例

Spark Streaming编程步骤

Spark Streaming编程一般包括一下几个步骤:

1.创建StreamingContext

2.创建输入DStream来定义输入源

3.通过对DStream应用转换操作和输出操作来定义处理逻辑

4.用streamingContext.start()来开始接收数据和处理流程

5.streamingContext.awaitTermination()方法来等待处理结束

  object StartSparkStreaming {
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
        .setMaster("local[2]")
        .setAppName("Streaming")
      // 1.创建StreamingContext
      val ssc = new StreamingContext(conf, Seconds(5))
      Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
      Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
      // 2.创建DStream
      val lines = ssc.socketTextStream("localhost", 9999)
      // 3.定义流计算处理逻辑
      val count = lines.flatMap(_.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)
      // 4.输出结果
      count.print()
      // 5.启动
      ssc.start()
      // 6.等待执行
      ssc.awaitTermination()
    }
  }

Transformations与Output Operations

DStream是不可变的, 这意味着不能直接改变它们的内容,而是通过对DStream进行一系列转换(Transformation)来实现预期的应用程序逻辑。每次转换都会创建一个新的DStream,该DStream表示来自父DStream的转换后的数据。DStream转换是惰性(lazy)的,这意味只有执行output操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation。

Transformations

Spark Streaming提供了丰富的transformation操作,这些transformation又分为了有状态的transformation和无状态的transformation。除此之外,Spark Streaming也提供了一些window操作,值得注意的是window操作也是有状态的。具体细节如下:

无状态的transformation

无状态的transformation是指每一个micro-batch的处理是相互独立的,即当前的计算结果不受之前计算结果的影响,Spark Streaming的大部分算子都是无状态的,比如常见的map(),flatMap(),reduceByKey()等等。

map(func)

对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream

    /** Return a new DStream by applying a function to all elements of this DStream. */
    def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
      new MappedDStream(this, context.sparkContext.clean(mapFunc))
    }

flatMap(func)

与map相似,但是每个输入项可用被映射为0个或者多个输出项

  /**
   * Return a new DStream by applying a function to all elements of this DStream,
   * and then flattening the results
   */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

filter(func)

返回一个新的DStream,仅包含源DStream中满足函数func的项

  /** Return a new DStream containing only the elements that satisfy a predicate. */
  def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
    new FilteredDStream(this, context.sparkContext.clean(filterFunc))
  }

repartition(numPartitions)

通过创建更多或者更少的分区改变DStream的并行程度

/**
   * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
   * returned DStream has exactly numPartitions partitions.
   */
  def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
    this.transform(_.repartition(numPartitions))
  }


reduce(func)

利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing each RDD
   * of this DStream.
   */
  def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
    this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
  }

count()

统计源DStream中每个RDD的元素数量

/**
   * Return a new DStream in which each RDD has a single element generated by counting each RDD
   * of this DStream.
   */
  def count(): DStream[Long] = ssc.withScope {
    this.map(_ => (null, 1L))
        .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
        .reduceByKey(_ + _)
        .map(_._2)
  }

union(otherStream)

返回一个新的DStream,包含源DStream和其他DStream的元素

/**
   * Return a new DStream by unifying data of another DStream with this DStream.
   * @param that Another DStream having the same slideDuration as this DStream.
   */
  def union(that: DStream[T]): DStream[T] = ssc.withScope {
    new UnionDStream[T](Array(this, that))
  }

countByValue()

应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数,比如lines.flatMap(_.split(" ")).countByValue().print(),对于输入:spark spark flink,将输出:(spark,2),(flink,1),即按照元素值进行分组,然后统计每个分组的元素个数。

从源码可以看出:底层实现为map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按当前的元素映射为一个tuple,其中key即为当前元素的值,然后再按照key做汇总。

/**
   * Return a new DStream in which each RDD contains the counts of each distinct value in
   * each RDD of this DStream. Hash partitioning is used to generate
   * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
   * `numPartitions` not specified).
   */
  def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
      : DStream[(T, Long)] = ssc.withScope {
    this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
  }

reduceByKey(func, [numTasks])

当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来

比如:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()

对于输入:spark spark flink,将输出:(spark,2),(flink,1)

  /**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the associative and commutative reduce function. Hash partitioning is used to
   * generate the RDDs with Spark's default number of partitions.
   */
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
    reduceByKey(reduceFunc, defaultPartitioner())
  }

join(otherStream, [numTasks])

当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream

  /**
   * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   */
  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
    join[W](other, defaultPartitioner())
  }

cogroup(otherStream, [numTasks])

当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组

// 输入:spark
// 输出:(spark,(CompactBuffer(1),CompactBuffer(1)))
val DS1 = lines.flatMap(_.split(" ")).map((_,1))
val DS2 = lines.flatMap(_.split(" ")).map((_,1))
DS1.cogroup(DS2).print()
  /**
   * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
   * Hash partitioning is used to generate the RDDs with Spark's default number
   * of partitions.
   */
  def cogroup[W: ClassTag](
      other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
    cogroup(other, defaultPartitioner())
  }

transform(func)

通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作

// 输入:spark spark flink
// 输出:(spark,2)、(flink,1)
val lines = ssc.socketTextStream("localhost", 9999)
val resultDStream = lines.transform(rdd => {
rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
})
resultDStream.print()
  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
    val cleanedF = context.sparkContext.clean(transformFunc, false)
    transform((r: RDD[T], _: Time) => cleanedF(r))
  }

有状态的transformation

有状态的transformation是指每个micro-batch的处理不是相互独立的,即当前的micro-batch处理依赖于之前的micro-batch计算结果。常见的有状态的transformation主要有countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey等等。其实所有的基于window的操作都是有状态的,因为追踪整个窗口内的数据。

Output Operations

使用Output operations可以将DStream写入多外部存储设备或打印到控制台。上文提到,Spark Streaming的transformation是lazy的,因此需要Output Operation进行触发计算,其功能类似于RDD的action操作。

关于大数据学习,Spark Streaming编程入门示例,以上就为大家做了基本的介绍了。Spark Streaming作为Spark流计算的重要组件,是学习当中的重点部分,多理解多练习才能更快掌握。成都加米谷大数据,专业大数据培训机构,大数据开发,数据分析与挖掘,零基础班本月正在招生中,课程大纲及试听课程可联系客服获取!
热点排行
推荐文章
立即申请>>