Flink1.10任务提交流程分析(一)

微信扫一扫,分享到朋友圈

Flink1.10任务提交流程分析(一)

Flink任务常见的提交方式通过flink run命令方式提交,如果我们想自己通过API方式实现任务提交,那么就需要了解flink run执行过程,本篇主要透过源码分析其提交流程。(注:基于1.10.1分析)

提交入口

查看bin/flink脚本可以看到提交入口类为:org.apache.flink.client.cli.CliFrontend,传入的参数就是flink 命令后面的参数,查看main方法:

public static void main(final String[] args) {

EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

// 1. $FLINK_HOME/conf

final String configurationDirectory = getConfigurationDirectoryFromEnv();

// 2. 加载flink-conf.yaml

final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

// 3. 初始化所有的提交模式的参数解析器

final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(

configuration,

configurationDirectory);

try {

//初始化执行入口

final CliFrontend cli = new CliFrontend(

configuration,

customCommandLines);

SecurityUtils.install(new SecurityConfiguration(cli.configuration));

int retCode = SecurityUtils.getInstalledContext()


//parseParameters 会根据不同的类型:run、info、list、modify等执行不同的流程

.runSecured(() -> cli.parseParameters(args));

System.exit(retCode);

}

catch (Throwable t) {

final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

LOG.error("Fatal error while running command line interface.", strippedThrowable);

strippedThrowable.printStackTrace();

System.exit(31);

}

}

CustomCommandLine 表示的是一个命令行的参数解析的接口,其实现有FlinkYarnSessionCli、DefaultCLI,FlinkYarnSessionCli解析per-job或者session模式参数,DefaultCLI解析standalone模式参数。程序会根据传入的参数选项选择合适的参数解析器,通过其isActive方法其匹配,然后调用applyCommandLineOptionsToConfiguration解析参数。

RUN流程

protected void run(String[] args) throws Exception {

LOG.info("Running 'run' command.");

//savepoint恢复参数

final Options commandOptions = CliFrontendParser.getRunCommandOptions();

//将参数封装在CommandLine中

final CommandLine commandLine = getCommandLine(commandOptions, args, true);

//实例一个ProgramOptions对象,包含了jar路径、用户程序入口类、用户程序参数、classpath等

final ProgramOptions programOptions = new ProgramOptions(commandLine);

// 帮助命令

if (commandLine.hasOption(HELP_OPTION.getOpt())) {

CliFrontendParser.printHelpForRun(customCommandLines);

return;

}


if (!programOptions.isPython()) {

// Java program should be specified a JAR file

if (programOptions.getJarFilePath() == null) {

throw new CliArgsException("Java program should be specified a JAR file.");

}

}

//代表程序,包含jar、参数等信息

final PackagedProgram program;

try {

LOG.info("Building program from JAR file");

program = buildProgram(programOptions);

}

catch (FileNotFoundException e) {

throw new CliArgsException("Could not build the program from JAR file.", e);

}

//程序所需要jar信息,主要是用户jar包

final List<URL> jobJars = program.getJobJarAndDependencies();

//获取有效的配置信息,在这里会根据不同的参数解析器获取有效的配置信息

final Configuration effectiveConfiguration =

getEffectiveConfiguration(commandLine, programOptions, jobJars);


LOG.debug("Effective executor configuration: {}", effectiveConfiguration);


try {

executeProgram(effectiveConfiguration, program);

} finally {

program.deleteExtractedLibraries();

}

}

在getEffectiveConfiguration方法中,会根据参数选择不同的参数解析器,例如在per-job模式会使用 -m yarn-cluster,那么就会选择FlinkYarnSessionCli参数解析器,在这个过程中有一个重要的参数配置:execution.target,目标执行器,决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job、remote,这个参数的配置也是通过不同的提交模式来配置的。

执行Program流程

executeProgram 方法直接调用ClientUtils.executeProgram方法:

public static void executeProgram(

PipelineExecutorServiceLoader executorServiceLoader,

Configuration configuration,

PackagedProgram program) throws ProgramInvocationException {

checkNotNull(executorServiceLoader);

final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();

final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();

try {

Thread.currentThread().setContextClassLoader(userCodeClassLoader);


LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

//用户创建程序执行的上下文

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(

executorServiceLoader,

configuration,

userCodeClassLoader);

//会将factory赋给ExecutionEnvironment中变量

ContextEnvironment.setAsContext(factory);


try {

//调用程序main方法

program.invokeInteractiveModeForExecution();

} finally {

ContextEnvironment.unsetContext();

}

} finally {

Thread.currentThread().setContextClassLoader(contextClassLoader);

}

}

PipelineExecutorServiceLoader 用户Executor执行器的选择,参考 Flink1.10基于工厂模式的任务提交与SPI机制

ContextEnvironmentFactory用于创建程序执行的上下文ExecutionEnvironment,可以理解为其封装了程序与外界之间的交互方式,例如per-job模式还是standalone模式、需要的资源大小等等,同时也会根据其类型创建不同StreamExecutionEnvironment(看下文详解)。对于客户端提交方式创建的是ContextEnvironment类型的ExecutionEnvironment。

Main提交流程

program.invokeInteractiveModeForExecution方法用户调用用户程序的main方法,在main方法中会调用StreamExecutionEnvironment.getExecutionEnvironment 获取合适的StreamExecutionEnvironment:

//StreamExecutionEnvironment.java

public static StreamExecutionEnvironment getExecutionEnvironment() {


//threadLocalContextEnvironmentFactory、contextEnvironmentFactory默认都为空,所以会调用createStreamExecutionEnvironment方法

return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)

.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)

.orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);

}


private static StreamExecutionEnvironment createStreamExecutionEnvironment() {


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

if (env instanceof ContextEnvironment) {

return new StreamContextEnvironment((ContextEnvironment) env);

} else if (env instanceof OptimizerPlanEnvironment) {

return new StreamPlanEnvironment(env);

} else {

return createLocalEnvironment();

}

}

//ExecutionEnvironment.java

public static ExecutionEnvironment getExecutionEnvironment() {

return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)

.map(ExecutionEnvironmentFactory::createExecutionEnvironment)

//在本地local的模式,创建LocalEnvironment

.orElseGet(ExecutionEnvironment::createLocalEnvironment);

}

ClientUtils.executeProgram
中分析到,会通过 ContextEnvironment. setAsContext( factory)
threadLocalContextEnvironment Factory
contextEnvironmentFactory
赋值,那么调用 ContextEnvironmentFactory. createExecutionEnvironment
得到一个 ContextEnvironment

最终 StreamExecutionEnvironment. getExecutionEnvironment
得到一个内部封装了 ContextEnvironment
对象的 StreamExecutionEnvironment
对象。

Execute流程

待main方法执行用户代码流程之后会调用StreamExecutionEnvironment.execute方法,接着会调用executeAsync(StreamGraph)方法:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {

checkNotNull(streamGraph, "StreamGraph cannot be null.");

checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

//根绝提交模式选择匹配的factory

final PipelineExecutorFactory executorFactory =

executorServiceLoader.getExecutorFactory(configuration);


checkNotNull(

executorFactory,

"Cannot find compatible factory for specified execution.target (=%s)",

configuration.get(DeploymentOptions.TARGET));

//选择合适的executor提交任务

CompletableFuture<? extends JobClient> jobClientFuture = executorFactory

.getExecutor(configuration)

.execute(streamGraph, configuration);


try {

JobClient jobClient = jobClientFuture.get();

jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));

return jobClient;

} catch (Throwable t) {

jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));

ExceptionUtils.rethrow(t);


// make javac happy, this code path will not be reached

return null;

}

}

这里就是上一篇讲到的根据SPI机制加载出所有PipelineExecutorFactory,然后选择匹配的factory,匹配的条件就是符合上文提到的execution.target参数的factory,对于yarn-per-job就是YarnJobClusterExecutorFactory,最终会获取到YarnJobClusterExecutor类型的Executor去向yarn提交作业。

总结

本文主要分析了flink run的开始到提交到集群前的流程,我认为可以简化为三步:

  • 选择合适的参数解析器解析命令参数(CustomCommandLine);

  • 选择合适的执行上下文环境(StreamExecutionEnvironment)

  • 选择合适的任务提交器(PipelineExecutor)

下一篇将会以yarn-per-job提交模式为例分析其具体提交过程。

抖音孵化独立APP“抖店” 商家可“移动”管理店铺生意

上一篇

Python|defaultdict与dict的差异

下一篇

你也可能喜欢

Flink1.10任务提交流程分析(一)

长按储存图像,分享给朋友