0%

大数据计算引擎之Flink配置高可用

高可用?什么是高可用?简单的理解就是高度可用性。我们在Hadoop中也曾配置了高可用,配置高可用的目的就是当我们的Hadoop出现故障的时候,我们的系统不至于当场崩溃。比如说我们配置的两个NameNode,当一个NameNode挂了之后,我们的另一个NameNode就会撑起大梁。在Flink中也是一样的。从本质上来说,就是配置两个JobManager.

Standalone 集群高可用性

对于Flink的Standalone集群的JobManager高可用性,相当于一个主JobManager,多个从JobManager,当我们的主节点牺牲之后,我们的备选节点就会挑起大梁,来接管我们的Flink集群。当我们的备胎上任后,一切又会恢复如常(所以备胎对于主的来说是好的)。这里来看,我们的备选和主节点是一样的,没有什么明显的区别,也就是说每个JobManager都可以充当我们的备选和主节点。我们可以看看官网的这张图:

jobmanager_ha_overview

配置(Configuration)

官网说,你要是想去开启我们的JobManager的高可用(HA),我们就必须要将高可用的模式改为 zookeeper , 也就是官网上说的 high-availability mode , 然后我们需要配置 zookeeper quorum 将所有 JobManager 主机及其 web UI 端口写入配置文件。Flink利用 ZooKeeper 在所有正在运行的 JobManager 实例之间进行分布式协调。 ZooKeeper 是独立于 Flink 的服务,通过 Leader 选举和轻量级一致状态存储提供高可靠的分布式协调。

Master文件(masters服务器)

官网说:In order to start an HA-cluster configure the *masters* file in conf/masters:

  • conf/masters,我们需要配置的是我们的JobManager和我们FlinkWebUIPort

默认情况下,job manager选一个随机端口作为进程随机通信端口。您可以通过high-availability.jobmanager.port 键修改此设置。此配置接受单个端口(例如50010),范围(50000-50025)或两者的组合(50010,50011,50020-50025,50050-50075)。

官网说:In order to start an HA-cluster add the following configuration keys to conf/flink-conf.yaml:

  • high-availability mode (required):在 conf/flink-conf.yaml 中,必须将高可用性模式设置为zookeeper,以打开高可用模式。或者将此选项设置为工厂类的FQN,Flink 通过创建 HighAvailabilityServices实例使用。

  • Zookeeper quorum(required): Zoookeeper服务的地址.

  • Zookeeper root(recommended): ZooKeeper 根节点,在该节点下放置所有集群节点。

  • Zookeeper cluster-id(recommended): ZooKeeper的cluster-id节点,在该节点下放置集群的所有相关数据。

important: 在运行YARN群集,每作业YARN会话或在另一个群集管理器上时,您不应手动设置此值。在这些情况下,将根据应用程序ID自动生成cluster-id。手动设置cluster-id会覆盖YARN中的此行为。依次使用-z CLI选项指定集群ID会覆盖手动配置。如果您在裸机上运行多个Flink HA群集,则必须为每个群集手动配置单独的群集ID。

  • Storage directory (required): JobManager 元数据保存在文件系统 storageDir 中,在 ZooKeeper 中仅保存了指向此状态的指针。

示例

1
2
3
4
5
high-availability: zookeeper
recovery.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_id
high-availability.zookeeper.quorum: hadoop101:2181,hadoop102:2181,hadoop103:2181
high-availability.storageDir: hdfs://hadoop101:9000/flink/ha

Demo:*masters

1
2
hadoop101:8081
hadoop102:8081

Demo:slaves

1
2
3
hadoop102
hadoop103
hadoop101

Demo;zoo.cnf

1
2
3
server.101=hadoop101:2888:3888
server.102=hadoop102:2888:3888
server.103=hadoop103:2888:3888

示例结果:

笔者这里遇到了这两个问题:

Q1: Flink无法向Hadoop的文件系统里面写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:131)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:326)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:296)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner$1.call(TaskManagerRunner.java:293)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:293)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:447)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:359)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:116)
... 8 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:443)
... 11 more

这是因为笔者下载的Flink是不带Hadoop的,所以我们这里需要自己手动的添加相关的hadoop的依赖包到Flink的lib目录下。这里笔者也提供了下载路径:

flink-shaded-hadoop-2-uber-2.7.5-8.0.jar

Q2: 说我缺少配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
at org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:65)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Configuration is missing the mandatory parameter: Key: 'high-availability.storageDir' , default: null (fallback keys: [{key=high-availability.zookeeper.storageDir, isDeprecated=true}, {key=recovery.zookeeper.storageDir, isDeprecated=true}])
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
... 2 more

最可气的就是这个,我全部按照官网来的,咋就不行啊。真的是。这里我们需要在flink-conf.yaml中加两个配置:

1
2
high-availability.zookeeper.storageDir: hdfs://hadoop101:9000/flink/flink-zk-ha
recovery.zookeeper.storageDir: hdfs://hadoop101:9000/flink/recovery

这样我们的高可用就配置好了。可以看一下结果:

1574393311565

这是打赏的地方...
---------Thanks for your attention---------