如上图,Flink中有4种图: StreamGraph JobGraph ExecutionGraph PhysicalGraph ,分别处于不同的阶段,承担不同的职责。



// StreamGraph.java
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
public class StreamGraph implements Pipeline {

StreamGraph可以查看,方法是在我们的 execute() 代码之前加入 System.out.println(env.getExecutionPlan()); ,这样会输出一个JSON,将这个JSON粘贴到 这里 就可以查看StreamGraph。以官方的 SocketWordCount 为例,后面还会多次用到这个例子,所以为了完整性,我把代码贴到这里:

* Implements a streaming windowed version of the "WordCount" program.
* <p>This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text server (at port 12345)
* using the <i>netcat</i> tool via
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
* and run this example with the hostname and the port as arguments.
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
.keyBy(value -> value.word)
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
// print the results with a single thread, rather than in parallel
env.execute("Socket Window WordCount");
// ------------------------------------------------------------------------
* Data type for words with count.
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
public String toString() {
return word + " : " + count;

这个代码是一个非常经典的流处理过程:Source( socketTextStream ) –> Map( flatMap ) –> Window( keyByWindow ) –> Sink( print ),这个过程包含了4个Operator(即括号中的部分)。它的StreamGraph长下面这样:

默认并行度设置为1( env.setParallelism(1); )时:



  1. 不知道为什么有些地方展示不全,Chrome、Safari、Firefox都试了,还是不行,不过只是遮挡了并行度,也能看出来,不是很影响。
  2. 这里分别展示了两个不同的并行度,是为了后面的对比用。



JobGraph和StreamGraph一样,也是一个有向无环图(DAG),这个图是由operator、operator间的输入输出关系、中间数据连接而成。JobGraph也被称为 Logical Graph 或者 Dataflow Graph (题外话,StreamGraph当然也是一个逻辑图,但StreamGraph这个概念其实是代码级的,并不对外体现,所以可以看到代码里面类上面加了 @Internal ,并且用户文档中从来没有提及这个概念,而JobGraph则是对外暴露的)。


A logical graph is a directed graph where the nodes are Operators and the edges define input/output-relationships of the operators and correspond to data streams or data sets.


// JobGraph.java
* The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
* <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
* form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
* but inside certain special vertices that establish the feedback channel amongst themselves.
* <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
* define the characteristics of the concrete operation and intermediate data.
public class JobGraph implements Serializable {

StreamGraph到JobGraph的转化也是在Client端进行的,主要工作做优化。其中非常重要的一个优化就是将一个个小的Operator串接成一个大的Operator(这个过程叫Operator Chain,后面会讲到)。因为执行的时候每个任务都是由一个单独的线程执行的,将小任务串接成大任务可以显著减少线程数以及线程间的数据传递和交互,提高吞吐,降低延迟。这个是可以配置和代码控制的,默认允许串接。

JobGraph也可以查看,方法是在Flink的Web UI提交页面上面(仍以SocketWordCount为例):







很显然,这样的运行就不是最高效的,所以在并行度的控制上要稍微注意一下,尽量让能够合并的operator chain在一起。

Client向JobManager(Dispatcher模块)提交Job时,实质提交的就是 JobGraph该Job依赖的jar包


JobManager接收到Client端提交的JobGraph及其依赖Jar之后就要开始调度运行该任务了,但JobGraph还是一个逻辑上的图,需要再进一步转化为 并行化可调度 的执行图。这个动作是JobManager(其中的JobMaster组件)做的。


* The execution graph is the central data structure that coordinates the distributed
* execution of a data flow. It keeps representations of each parallel task, each
* intermediate stream, and the communication between them.
* <p>The execution graph consists of the following constructs:
* <ul>
*     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
*         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
*         The ExecutionJobVertex is identified inside the graph by the {@link JobVertexID}, which it takes
*         from the JobGraph's corresponding JobVertex.</li>
*     <li>The {@link ExecutionVertex} represents one parallel subtask. For each ExecutionJobVertex, there are
*         as many ExecutionVertices as the parallelism. The ExecutionVertex is identified by
*         the ExecutionJobVertex and the index of the parallel subtask</li>
*     <li>The {@link Execution} is one attempt to execute a ExecutionVertex. There may be multiple Executions
*         for the ExecutionVertex, in case of a failure, or in the case where some data needs to be recomputed
*         because it is no longer available when requested by later operations. An Execution is always
*         identified by an {@link ExecutionAttemptID}. All messages between the JobManager and the TaskManager
*         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
*         address the message receiver.</li>
* </ul>
* <h2>Global and local failover</h2>
* <p>The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
* <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
* data flow graph from the last completed checkpoint. Global failover is considered the
* "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
* found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
* <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
* The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
* attempts to restart as little as possible, but as much as necessary.
* <p>Between local- and global failover, the global failover always takes precedence, because it
* is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
* guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
* with every global failover (and other global actions, like job cancellation, or terminal
* failure). Local failover is always scoped by the modification version that the execution graph
* had when the failover was triggered. If a new global modification version is reached during
* local failover (meaning there is a concurrent global failover), the failover strategy has to
* yield before the global failover.
public class ExecutionGraph implements AccessExecutionGraph {



A physical graph is the result of translating a Logical Graph for execution in a distributed runtime. The nodes are Tasks and the edges indicate input/output-relationships or partitions of data streams or data sets.








微软Windows10X 将放弃支持 Windows 桌面驱动程序