0%

大数据计算引擎之Flink源码阅读(1) WordCount

老夫被安排写Flink也有几个月了,虽然一来就是写的FlinkSQL一块的内容,但是写的似乎都忘了Flink的主流那就是流处理了。正好自己的入门项目也不曾做个Flink的WordCount,再加上老夫最近想看看Flink的源码程序了,所以,就拿WordCount来走走Flink的流的流程。学习一下Flink的流处理。

本文参考字自 透过源码

先编写程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author Mr.Sun
* @version v.1.0
* @title WordCount
* @description Flink 学习之WordCount入门
* @date 2019/12/13 9:15
*/
public class WordCount {
public static void main(String[] args) throws Exception {
StreamEnv streamEnv = StreamEnv.builder().enableRestart().setParallelism(1).finish();
StreamExecutionEnvironment env = streamEnv.getEnv();

DataStream<String> socketTextStream = env.socketTextStream("hadoop103", 5555);

socketTextStream.flatMap(new WordFlatMapFunction()).setParallelism(1)
.keyBy(0)
.sum(1)
.print() ;

env.execute("test");
}

private static class WordFlatMapFunction implements FlatMapFunction<String , Tuple2<String,Integer>>{
@Override
public void flatMap(String word, Collector<Tuple2<String,Integer>> collector) throws Exception {
collector.collect(new Tuple2(word , 1));
}
}
}

这里的环境获取老夫是提前写了一个单例的,您也可以直接在这里去拿到环境变量。,这里呢,老夫也把简单写的这个单例拿出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* @author Mr.Sun
* @version v.1.0
* @title StreamEnv
* @description
* @date 2019/12/13 9:09
*/
public class StreamEnv {
private final StreamExecutionEnvironment env;
private StreamTableEnvironment tableEnv;
private Builder builder ;

private StreamEnv(Builder builder) {
env = StreamExecutionEnvironment.getExecutionEnvironment();
if (builder.RESTART){
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(5, TimeUnit.MINUTES),
Time.of(10,TimeUnit.SECONDS)
));
}
if (builder.parallelism != null ){
env.setParallelism(builder.parallelism);
}
env.enableCheckpointing(1000 , CheckpointingMode.EXACTLY_ONCE) ;

this.builder = builder ;
}

public StreamExecutionEnvironment getEnv() {
return env;
}

public StreamTableEnvironment getTableEnv() {
if(tableEnv == null) {
tableEnv = StreamTableEnvironment.create(env);
}
return tableEnv;
}
public static Builder builder() {
return new Builder();
}
public static class Builder{
private boolean RESTART = false ;
private Integer parallelism ;

public Builder enableRestart(){
this.RESTART = true ;
return this ;
}
public Builder setParallelism(int parallelism){
this.parallelism = parallelism ;
return this ;
}

public StreamEnv finish(){
return new StreamEnv(this) ;
}
}

程序写好了,我们便在 hostname为hadoop103上开启我们的服务:

1
nc -l 5555

随后输入我们的word,可以看到我们的结果:
demo-test

跟进程序,从环境开始

程序的启动从这句开始:

1
env = StreamExecutionEnvironment.getExecutionEnvironment();

这会返回一个我们Flink的流的执行环境,如果要返回批的执行环境就是如下:

1
env = ExecutionEnvironment.getExecutionEnvironment();

所谓的执行环境是针对整个Flink的程序的执行上下文的,在环境中会存在一些配置,例如,老夫这里设置的是否开启失败重启策略?是否设置多并行度等等,我们可以进去看看这个环境里面给我们哪些信息:
LocalStreamEnvironment

这是可以清除的看到,我们的StreamExecutionEnvironment在流中是一个父类,子类包括 LocalStreamEnvironment,RemoteStreamEnvironment,StreamPlanEnvironment,StreamContextEnvironment.
StreamExecutionEnvironment
这张图给出了StreamExecutionEnvironment里面的方法,参数,配置等基本信息,图有点小,建议您自己去看看源码。
对于分布式流处理程序来说,我们在代码中定义了如:flatMap(),keyBy,sum()等,事实是是一种可以理解为声明的方式,旨在告诉我们程序,这里我使用了flatMap()算子,但是真正启动计算的代码不在这里,我们的Flink是一个懒加载过程,所以必须要有:env.execute() 去执行我们的Flink程序。由于我们是在本地运行程序,因此这里会返回一个LocalStreamEnvironment。

算子(Operator)的注册声明

上面老夫使用的算子,第一个就是 flatMap(new WordFlatMapFunction())的,我们跟进去一看便知:

1
2
3
4
5
6
7
8
9
10
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return flatMap(flatMapper, outType);
}
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

}

我们可以看到,这里首先是先拿到了一个flatMap算子的输出类型 outType然后又生成了一个Operator,这也符合Flink的流式计算的核心概念了。就是所谓的链式处理。数据流从一输入流一个个传递给Operator进行链式处理,最后到我们的输出。针对数据流的每一次处理就是一次 operator,这里 operator之间还可以组成一个chain来处理。
Flink如何看待用户的处理流程呢?官网有这么一个介绍:
flink-dela-steam
抽象下来就是:
flink-transform
关于这里面的算子,我们看这个图,就可以知道 DataStreamSource -> SingleOutputStreamOperator -> DataStream的关系,以及相关的算子方法。
DataStreamSource

程序的执行

前面有提到所谓的算子只是一个声明,不代表你声明了就会去执行,Flink的执行与Spark相同,都属于那种懒加载的方式,所以我们需要自己去开启Flink程序的执行。

1
2
// 这里的参数,是我们的JobName
env.execute("test");

这里的源码看的有点蒙:看下源码

1
2
3
4
5
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

return execute(getStreamGraph(jobName));
}

我既然给了一个jobName , 为什么还要去检查这个jobName是否为空呢?纠结

本地模式下的execute方法

这行代码主要做这几件事:

  • 生成StreamGraph,表示流拓扑的类。 它包含为执行建立作业图所需的所有信息。

  • 生成JobGraph,这个图是要交给Flink去生成Task的图

  • 生成一系列的配置

  • 将JobGraph和配置交给Flink集群去运行,本地运行会为我们生成一个最小的集群环境,也就是我们的本地环境,如果不是本地运行的话,还会把JAR文件通过网络发给其他节点;

  • 以本地模式运行的话,可以看到启动过程,如启动性能度量,WEB模块,JobManager,ResourceManager,TaskManager等

  • 启动任务,值得一提的是在启动任务之前,先启动了一个用户类加载器,这个类加载器可以用来做一些在运行时动态加载类的工作。

远程模式下(RemoteEnvironment) 的execute()方法

这里老夫还没有跟进看,就看别人写的吧

远程模式的程序执行更加有趣一点。第一步仍然是获取StreamGraph,然后调用executeRemotely方法进行远程执行。该方法首先创建一个用户代码加载器:

1
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());

然后创建一系列配置,交给Client对象:

1
2
3
4
5
6
7
8
9
    ClusterClient client;
try{
client = new StandaloneClusterClient(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
}
try{
return client.run(streamGraph,jarFiles,globalClasspaths,usercodeClassLoader).getJobExecutionResult();
}

client 方法首先生成一个 JobGraph,然后传递给JobClient.关于Client、JobClient、JobManager到底谁管谁,可以看这个:
flink-clinet-jobclient-manager
确切的说,JobClient负责以异步的方式和JobManager通信(Actor是scala的异步模块),具体的通信任务有JobClientActor完成。相对应的,JobManager的通信任务也有一个Actor完成。

1
2
3
4
5
6
7
8
JobListeningContext jobListeningContext = submitJob(actorSystem,
config, highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
classLoader);

return awaitJobResult(jobListeningContext);

可以看到,该方法阻塞在awaitJobResult方法上,并最终返回了一个JobListeningContext,透过这个Context可以得到程序运行的状态和结果.

两种环境基本就是这样子的,现在我们的程序执行了execute(),跟进看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 这里生成一个JobGraph
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
// 生成配置文件
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);

if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}

int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
// 生成一个最小的cluster配置
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();

if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
// 提交到最小的cluster去执行程序
MiniCluster miniCluster = new MiniCluster(cfg);

try {
// 启动集群,包括启动JobMaster,进行leader选举等等
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
// 提交任务到JobMaster
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}

我们再看一下这个提交JobMaster的方法,跟进去看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
Preconditions.checkNotNull(job, "job is null");
// 在这里,我们提交了我们的jobMaster
CompletableFuture<JobSubmissionResult> submissionFuture = this.submitJob(job);
CompletableFuture jobResultFuture = submissionFuture.thenCompose((ignored) -> {
return this.requestJobResult(job.getJobID());
});

JobResult jobResult;
try {
jobResult = (JobResult)jobResultFuture.get();
} catch (ExecutionException var7) {
throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(var7));
}

try {
return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException | IOException var6) {
throw new JobExecutionException(job.getJobID(), var6);
}
}

现在找到了这个提交的方法 submitJob(),我们再进去瞅瞅:

1
2
3
4
5
6
7
8
9
10
11
12
13
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = this.getDispatcherGatewayFuture();
jobGraph.setAllowQueuedScheduling(true);
CompletableFuture<InetSocketAddress> blobServerAddressFuture = this.createBlobServerAddress(dispatcherGatewayFuture);
CompletableFuture<Void> jarUploadFuture = this.uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCombine(dispatcherGatewayFuture, (ack, dispatcherGateway) -> {
// 这里就是提交吧
return dispatcherGateway.submitJob(jobGraph, this.rpcTimeout);
}).thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply((ignored) -> {
return new JobSubmissionResult(jobGraph.getJobID());
});
}

这里的Dispatcher 是一个接收job,然后指派JobMaster去启动任务的类,我们看看他的这个实现类,在本地下启动的是 MiniDispatcher , 在集群上提交任务时,集群上启动的是 StandaloneDispatcher.

这里的Dispatcher启动了一个JobManagerRunner , 委托JobManagerRunner去启动该Job的JobMater。

这是打赏的地方...

本文标题:大数据计算引擎之Flink源码阅读(1) WordCount

文章作者:Mr.Sun

发布时间:2019年12月13日 - 10:16:53

最后更新:2020年06月15日 - 10:05:35

原始链接:http://www.blog.sun-iot.xyz/posts/238cb99a

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

---------Thanks for your attention---------