Flink on Yarn 模式下,提交脚本有两种方式,一种是比较委婉的表达,先开辟一个 yarn-session的资源,再去提交 Flink 作业 , 这个更像一个全局的变量。这个Flink集群会常驻在yarn集群中,除非手工停止;另一个就相当于局部变量了,我们针对每一个flink 作业都开辟一个yarn cluster ,两种方式,各有千秋,不论孰好孰坏。

yarn-session提交

我们先看一下我们的 yarn-session的提交脚本可以的参数:

1
./yarn-session.sh -h

可以看到,我们必须要指定的是我们的 task manager的个数,即我们的 -n 数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-at,--applicationType <arg> Set a custom application type for the application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

由上述,我们可以以简单的得到一个公共的yarn的session的资源

1
2
3
4
5
6
7
./yarn-session.sh \
-n 3 \
-s 6 \
-jm 2048 \
-tm 1536 \
-nm "flink on yarn"
-d

然后,我们在提交我们的Job作业到我们的Yarn集群上:

1
2
3
flink run \
[jar path] \
[class path] \
1
./flink <ACTION> [OPTIONS] [ARGUMENTS]
1
2
3
4
5
6
7
8
9
10
./flink run \
-m yarn-cluster \
-ys 8 \
-yn 3 \
-p 3 \
-ynm "flink yarn cluster" \
-yjm 2048 \
-ytm 1536 \
-d \
$2 outline --job $3

1574667026480

以下,我只列出一些我这里会用到的参数说明,具体说明可查看:

1
./flink -h

对于ACTION

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
Action "run" compiles and runs a program.

Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action options:
-c,--class <classname>
-C,--classpath <url>
-d,--detached If present, runs the job in detached mode 可以运行任务后无需再控制台保持连接

-p,--parallelism <parallelism> 并行度设置
-py,--python <python> Python script with the program entry point. The dependent resources can be
configured with the `--pyFiles` option.
-pyfs,--pyFiles <pyFiles> Attach custom python files for job.
Comma can be used as the separator to
specify multiple files. The standard
python resource file suffixes such as
.py/.egg/.zip are all supported.(eg:
--pyFiles
file:///tmp/myresource.zip,hdfs:///$na
menode_address/myresource2.zip)
-pym,--pyModule <pyModule> Python module with the program entry point. This option must be used in
conjunction with `--pyFiles`.
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached mode -> 可以运行任务后无需再控制台保持连接
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yh,--yarnhelp Help for the Yarn session CLI.
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynm,--yarnname <arg> Set a custom name for the application
on YARN

-ys,--yarnslots <arg> Number of slots per TaskManager
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)

Action "info" shows the optimized execution plan of the program (JSON).

Syntax: info [OPTIONS] <jar-file> <arguments>
"info" action options:
-c,--class <classname> Class with the program entry point
("main()" method or "getPlan()" method).
Only needed if the JAR file does not
specify the class in its manifest.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.

Action "list" lists running and scheduled programs.

Syntax: list [OPTIONS]
"list" action options:
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode

Action "stop" stops a running program with a savepoint (streaming jobs only).

Syntax: stop [OPTIONS] <Job ID>
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session

Action "cancel" cancels a running program.

Syntax: cancel [OPTIONS] <Job ID>
"cancel" action options:
-s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling
a job with savepoint is deprecated.
Use "stop" instead.
Trigger savepoint and cancel job.
The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
Options for yarn-cluster mode:
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode

Options for default mode:
-m,--jobmanager <arg> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
for high availability mode