Flink作为目前代表性的流式计算框架之一,在大数据领域的应用开始呈现出越来越广泛地趋势,尤其是针对于流计算、批处理等方向,Flink能够实现低延迟的数据计算,给大数据应用场景拓宽了更多的可能性。今天,我们主要为大家分享一下Flink实例关于算子编程方面的示例。
Flink Transformation,涵盖多个算子,可以实现对Flink数据流进行处理和转化,是Flink流处理非常核心的API。Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。
Flink框架当中,使用算子,需要在算子上进行用户自定义操作,通常会采用Lambda表达式或者继承模板类并重写函数两种方式完成这个用户自定义的过程。以map为例:
map算子对一个DataStream中的每个元素使用用户自定义的map函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同。
我们可以重写MapFunction或RichMapFunction来自定义map函数,RichMapFunction的定义为:RichMapFunction[IN,OUT],其内部有一个map虚函数,我们需要对这个虚函数重写。
val dataStream:DataStream[Int]=senv.fromElements(1,2,-3,0,5,-9,8)
//继承RichMapFunction
//第一个泛型是输入类型,第二个参数是泛型类型
class DoubleMapFunction extends RichMapFunction[Int,String]{
override def map(input:Int):String=
("overide map Input:"+input.toString+",Output:"+(input*2).toString)
}
val richFunctionDataStream=dataStream.map{new DoubleMapFunction()}
上面的代码清单重写了RichMapFunction中的map函数,将输入结果乘以2,转化为字符串后输出。我们也可以不用显示定义DoubleMapFunction这个类,而是使用匿名类:
//匿名类
val anonymousDataStream=dataStream.map{new RichMapFunction[Int,String]{
override def map(input:Int):String={
("overide mapInput:"+input.toString+",Output:"+(input*2).toString)
}
}}
自定义map函数最简便的操作是使用Lambda表达式。
//使用=>构造Lambda表达式
val lambda=dataStream.map(input=>(input*2).toDouble)
上面的代码清单中,我们对某整数数据流进行操作,输入元素均为Int,输出元素均为Double。
也可以使用下划线来构造Lambda表达式:
//使用_构造Lambda表达式
val lambda2=dataStream.map{_.toDouble*2}
注意,使用Scala进行Flink编程,自定义算子时可以使用圆括号(),也可以使用花括号{}。
对上面的几种方式比较可见,Lambda表达式更为简洁,但是可读性差,其他人不容易读懂代码逻辑。重写函数的方式代码更为臃肿,但定义更清晰。此外,RichFunction还提供了一系列其他方法,包括open、close、getRuntimeContext和setRuntimeContext等虚函数方法,重写这些方法可以创建状态数据、对数据进行广播,获取累加器和计数器等。
关于Flink实例,以上就是一map算子为例进行的一点简单的介绍了。Map算子是各个大数据计算引擎都会用到的一个算子,因此也算是基础入门阶段必要的练习之一了,大家可以熟练掌握。成都加米谷大数据,大数据技术分享,
大数据培训班课程,更多详情可联系客服了解!