本文介绍Flink任务流转过程中涉及的图,知道这些可以更好的了解Flink的运行流程。
如上图,Flink中有4种图: StreamGraph 、 JobGraph 、 ExecutionGraph 、 PhysicalGraph ,分别处于不同的阶段,承担不同的职责。
StreamGraph
StreamGraph其实就是把我们的代码逻辑以拓扑图的形式组织了一下,其实现类的描述如下:
// StreamGraph.java /** * Class representing the streaming topology. It contains all the information * necessary to build the jobgraph for the execution. */ @Internal public class StreamGraph implements Pipeline { ... }
StreamGraph可以查看,方法是在我们的 execute()
代码之前加入 System.out.println(env.getExecutionPlan());
,这样会输出一个JSON,将这个JSON粘贴到 这里 就可以查看StreamGraph。以官方的 SocketWordCount 为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:
/** * Implements a streaming windowed version of the "WordCount" program. * * <p>This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the <i>netcat</i> tool via * <pre> * nc -l 12345 on Linux or nc -l -p 12345 on Windows * </pre> * and run this example with the hostname and the port as arguments. */ @SuppressWarnings("serial") public class SocketWindowWordCount { public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy(value -> value.word) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
这个代码是一个非常经典的流处理过程:Source( socketTextStream
) –> Map( flatMap
) –> Window( keyByWindow
) –> Sink( print
),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:
默认并行度设置为1( env.setParallelism(1);
)时:
默认并行度设置为4时:
注:
- 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
- 这里分别展示了两个不同的并行度,是为了后面的对比用。
程序真正运行时也会先转化为StreamGraph,然后进一步转化为JobGraph。
JobGraph
JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为 Logical Graph 或者 Dataflow Graph (题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了 @Internal
,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。
官网描述如下:
A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.
代码注释说明如下:
// JobGraph.java /** * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. * All programs from higher level APIs are transformed into JobGraphs. * * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph * but inside certain special vertices that establish the feedback channel amongst themselves. * * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result * define the characteristics of the concrete operation and intermediate data. */ public class JobGraph implements Serializable { ... }
StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是将一个个小的Operator串接成一个大的Operator(这个过程叫Operator Chain,后面会讲到)。因为执行的时候每个任务都是由一个单独的线程执行的,将小任务串接成大任务可以显著减少线程数以及线程间的数据传递和交互,提高吞吐,降低延迟。这个是可以配置和代码控制的,默认允许串接。
JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):
注意:如果应用有必填参数一定要填,否则会报错。
默认并行度设置为1时的JobGraph:
可以看到默认并行度设置为1的时候,优化器把source和flatmap这两个operator串接(chain)成一个大的子任务了,把后面的window和sink两个operator串接(chain)成一个大的子任务了,这样到时候source和flatmap就在一个线程里面执行,window和sink在一个线程里面运行。
默认并行度设置为4时的JobGraph:
如果我们把默认并行度设置为4,那图就变成了上面这样,可以看到相比于并行度为1的图,没有任何合并。主要原因是:
socketTextStream
很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。
Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是 JobGraph 和 该Job依赖的jar包 。
ExecutionGraph
JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为 并行化 、 可调度 的执行图。这个动作是JobManager(其中的JobMaster组件)做的。
这一层已经更加具体化了,代码说明比较长,但对于使用者基本无需关注,为了完整性,这里也附一下,有兴趣的自行查看:
/** * The execution graph is the central data structure that coordinates the distributed * execution of a data flow. It keeps representations of each parallel task, each * intermediate stream, and the communication between them. * * <p>The execution graph consists of the following constructs: * <ul> * <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like * "map" or "join") during execution. It holds the aggregated state of all parallel subtasks. * The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes * from the JobGraph's corresponding JobVertex.</li> * <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are * as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by * the ExecutionJobVertex and the index of the parallel subtask</li> * <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions * for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed * because it is no longer available when requested by later operations. An Execution is always * identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager * about deployment of tasks and updates in the task status always use the ExecutionAttemptID to * address the message receiver.</li> * </ul> * * <h2>Global and local failover</h2> * * <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>. * * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole * data flow graph from the last completed checkpoint. Global failover is considered the * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug). * * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails. * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically * attempts to restart as little as possible, but as much as necessary. * * <p>Between local- and global failover, the global failover always takes precedence, because it * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented * with every global failover (and other global actions, like job cancellation, or terminal * failure). Local failover is always scoped by the modification version that the execution graph * had when the failover was triggered. If a new global modification version is reached during * local failover (meaning there is a concurrent global failover), the failover strategy has to * yield before the global failover. */ public class ExecutionGraph implements AccessExecutionGraph { ... }
PhysicalGraph
物理图其实是任务调度到TaskManager上面真正执行的一种“运行图”,它没有具体对应的类。官方说明如下:
A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.
这四个图越往下,越具体,也越难理解,但实际中对于使用而言,一般只需要知道有这几个环节转换即可,细节无需太关注。下面附一张Jark大神博客的图,更多细节可见参考部分给的他的文章:
总结
其实图这个概念并非Flink原创,Storm、Spark里面也有类似的概念,Flink也是站在巨人的肩膀上而已。从StreamGraph迭代到最终的PhysicalGraph,由抽象到具体,抽象的给人看,具体的给框架看,有点类似于代码编译时的预处理、编译、汇编、链接的过程。
参考: