Flink源码阅读(三)checkPoint之容错恢复

承接上文:Flink源码阅读(二)checkPoint之产生原理。回顾一下上一篇提到的四个问题

  • 那为什么在一个输入流的情况下也有checkpoint,如果是的话,是怎么生成checkpint快照的
  • 假设一条数据落盘失败了,checkpoint能否支持从故障中恢复
  • checckpoint保证一致性是指状态(state)的一致性,还是指数据的一致性?
  • 这里说的buffers在源码层面指代什么?

本文目的用来解析问题二:假设一条数据落盘失败了,checkpoint能否支持从故障中恢复

通过上篇文章已经知道发生一次checkpoint的基本流程

1.准备检查点,允许算子进行一些预生成barrier工作。
2.向下游发送检查点barrier
3.准备在生成快照的缓冲区溢出以用于输入和输出
4.回放state快照。基于异步操作以免影响正在进行的checkpoint
复制代码

同样参考Flink 1.11 官方文档 安全的流处理: ci.apache.org/projects/fl…

触发checkpoint (Starting Checkpoint)

1.进行一些的预检查,保证CheckpointCoordinator实例内的资源锁定,以保护检查点更新。
2.向JobMaster注册checkpoint信息
3.当取消任务的时候,停止正在调度器中,等待调度的checkpoint
4.设置每个算子已经完成的checkpoint数量,CheckpointCoordinator一条算子链subtask完成的checkpoint数量。为Jobmaster上注册的checkpoint,异步提交进行状态同步
5.生成checkpoint快照
复制代码

CheckpointCoordinator#startTriggeringCheckpoint

private void startTriggeringCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
boolean advanceToEndOfTime,
CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
try {
// make some eager pre-checks
//进行一些的预检查,保证CheckpointCoordinator实例内的资源锁定,以保护检查点更新。
synchronized (lock) {
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
// we will actually trigger this checkpoint!
Preconditions.checkState(!isTriggering);
isTriggering = true;
//向JobMaster注册checkpoint信息
final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
initializeCheckpoint(props, externalSavepointLocation)
.thenApplyAsync(
(checkpointIdAndStorageLocation) -> createPendingCheckpoint(
timestamp,
props,
ackTasks,
isPeriodic,
checkpointIdAndStorageLocation.checkpointId,
checkpointIdAndStorageLocation.checkpointStorageLocation,
onCompletionPromise),
timer);
//在每个算子已经完成的checkpoint数量
final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
.thenCompose(this::snapshotMasterState);
//因为一个CheckpointCoordinator是管理一条算子链所有算子checkpoint完成的数量,所以此处的checkpoint完成数,是指一条算子链的所有subtask checkpoint的完成数
final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
//触发checkpoint,并将checkpoint的进行状态同步给JobMaster							OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
//异步提交为Jobmaster上注册的checkpoint,进行状态同步
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
.whenCompleteAsync(
(ignored, throwable) -> {
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
// no exception, no discarding, everything is OK
//生成checkpoint快照
snapshotTaskState(
timestamp,
checkpoint.getCheckpointId(),
checkpoint.getCheckpointStorageLocation(),
props,
executions,
advanceToEndOfTime);
onTriggerSuccess();
} else {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(onCompletionPromise, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
}
},
timer);
} catch (Throwable throwable) {
onTriggerFailure(onCompletionPromise, throwable);
}
}
复制代码

Continue…QwQ

稀土掘金
我还没有学会写个人说明!
上一篇

Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

下一篇

python基础(2)字符串常用方法

你也可能喜欢

评论已经被关闭。

插入图片