主页 > 新闻资讯 > 大数据学习:Flink Rescale实现流程

大数据学习:Flink Rescale实现流程

作者:张老师 浏览次数: 2021-04-26 17:13
前面对于Flink Rescale做了简单的基础讲解,Flink Rescale机制,其实简单来说,就为为了确保运行过程中,作业更改后状态的一致性,确保系统的稳定运行。今天的大数据学习分享,我们就主要来讲讲Flink Rescale实现流程。

通常来说,Flink Rescale,大致可以分为三个阶段:做快照、停止作业、恢复作业。这里的代码以flink-1.10.0作为参考。我们先介绍关键类,再介绍如何制作快照、如何恢复快照。

大数据学习:Flink Rescale实现流程

关键类

CheckpointCoordinator

这个是位于Master节点的快照控制中心,负责定期的触发checkpoint和手动触发savepoint,维护在做和已完成的快照。

StateAssignmentOperation

这个是位于Master节点的作业恢复时负责rescale的类,主要是根据新作业的并发重新分配状态。针对operator state,主要采用broadcast的方式使得每个task都能接触算子全部的状态;针对key state,采用均分KG的方法来重新划分state的归属。

TaskStateManagerImpl

具体Task的状态管理中心,包括和JobMaster做checkpoint的交互,管理本地状态。

StreamTaskStateInitializerImpl

具体task的状态恢复,这里也是各个statebackend开始创建的地方。

RocksDBKeyedStateBackend

具体task通过rocksdb做key state的地方。使用这种backend,每个状态是一个cf,主键的组织形式为<KG,key,namespace>。支持增量快照和全量快照。

HeapKeyedStateBackend

具体task通过rocksdb做key state的地方。使用这种backend,底层通过使用CopyOnWriteStateMap来存储,主键的组织形式为<NS,K,SV>。相比rocksdb,内存的存取速度都非常快,但是状态大小受制于内存。

如何制作快照

CheckpointCoordinator::triggerCheckpoint()。这个是checkpoint和savepoint共同的入口函数,checkpoint是通过定时调度来做的,savepoint则需要人工触发。这里头会做一些控制检查,没有问题的话就会向source task发送制作快照通知。

Execution::triggerCheckpointHelper()。通知source task对应的节点做快照。

TaskExecutor::triggerCheckpoint()。快照通知到TaskManager。

Task::triggerCheckpointBarrier()。进到具体task里。

StreamTask::performCheckpoint()。新版本采用了mailBox模型来解决持锁竞争问题。这里会首先下发barrier,然后开始本地快照。

CheckpointingOperation::executeCheckpointing()。进行同步快照(checkpointStreamOperator方法)和异步快照(AsyncCheckpointRunnable类)。

StreamOperator::snapshotState()→AbstractStreamOperator::snapshotState()。同步快照的制作,主要保存KeyedStateRaw、OperatorStateRaw、OperatorStateManaged、KeyedStateManaged等状态。

AsyncCheckpointRunnable::run()。异步快照的制作。

CheckpointResponder::acknowledgeCheckpoint()。快照做完后汇报给主节点。

针对非source节点,需要上游的barrier对齐后才能触发快照,这点跟source task略有不同,如下所示:

CheckpointedInputGate::pollNext()。从输入里头获取barrier。

CheckpointBarrierAligner::processBarrier()。处理barrier,负责exactly-once快照的处理。另一个类似类CheckpointBarrierTracker则负责at-least-once快照的处理。

CheckpointBarrierAligner::notifyCheckpoint()。如果barrier都到齐了,那么开始制作快照。

StreamTask::triggerCheckpointOnBarrier()。进到task里,之后的流程就如上面所述了。

如何恢复快照

Master端的分配

主要的状态分配逻辑都在类StateAssignmentOperation里。这里先明确几个概念:

ExecutionJobVertex,表示一个逻辑上的执行节点,可能是好几个operator通过chain连到一起的。

Execution,对应ExecutionJobVertex的一个并发执行,也是由好几个operator通过chain连到一起的。

OperatorState,表示一个operator的所有并发的状态。

OperatorID,一个operator的唯一标识。

KeyGroupRange,表示一个subtask所负责的KG范围。

状态。包括ManagedOperatorStates、RawOperatorStates、ManagedKeyedState和RawKeyedState。

TaskStateSnapshot,一个Execution所有operator的状态。这里的处理逻辑是,对ExecutionJobVertex的所有operator做状态分配,对operator的所有subtask做状态分配。基本流程如下:

检查并发是否符合要求,主要是确保设置并发不要超过最大并发等。

计算每个subtask负责的KeyGroupRange,下面根据这个标准来分配KG。

重新分配operatorState,主要在reDistributePartitionableStates里实现,这里头对unionState进行了合并,按照round-bin的方式来分配list的state。

重新分配keyedState,主要在reDistributeKeyedStates里实现,这里头会具体到subtask里,从之前的快照里找到所有属于它的stateHandler。

将分配好的状态赋值给ExecutionJobVertices。这里会以Execution为基本单位,设置它的JobManagerTaskRestore(由多个operator的状态组成)。

Task端的恢复

当状态都分配好了之后,在Task端就可以进行状态恢复了。大概流程如下:

TaskStateManagerImpl::prioritizedOperatorState(),将对应operator的状态(OperatorSubtaskState)拿出来,最后存到PrioritizedOperatorSubtaskState里。

StreamTaskStateInitializerImpl::streamOperatorStateContext,初始化keyedStatedBackend、operatorStateBackend、timeServiceManager等过程。

BackendRestorerProcedure::createAndRestore(),这个是在初始化keyedStatedBackend的时候调用的,将状态保存到了keyedStatedBackend中。

RestoreOperation::restore(),针对历史状态进行恢复。这是个抽象函数,比如一个具体的实现是RocksDBFullRestoreOperation::restore()。

之后根据具体的stateHandle进行恢复。

关于大数据学习,Flink Rescale实现流程,以上就为大家做了简单的介绍了。Flink Rescale机制,在实际运行当中,算是比较细节的部分,有时间可以结合到源码去深入理解一下。成都加米谷大数据,专业大数据培训机构,大数据开发,数据分析与挖掘,零基础班本月正在招生中,课程大纲及预约试听,可联系客服获取!
热点排行
推荐文章
立即申请>>