老夫被安排写Flink也有几个月了,虽然一来就是写的FlinkSQL一块的内容,但是写的似乎都忘了Flink的主流那就是流处理了。正好自己的入门项目也不曾做个Flink的WordCount,再加上老夫最近想看看Flink的源码程序了,所以,就拿WordCount来走走Flink的流的流程。学习一下Flink的流处理。
本文参考字自 透过源码
先编写程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class WordCount { public static void main (String[] args) throws Exception { StreamEnv streamEnv = StreamEnv.builder().enableRestart().setParallelism(1 ).finish(); StreamExecutionEnvironment env = streamEnv.getEnv(); DataStream<String> socketTextStream = env.socketTextStream("hadoop103" , 5555 ); socketTextStream.flatMap(new WordFlatMapFunction()).setParallelism(1 ) .keyBy(0 ) .sum(1 ) .print() ; env.execute("test" ); } private static class WordFlatMapFunction implements FlatMapFunction <String , Tuple2 <String ,Integer >> { @Override public void flatMap (String word, Collector<Tuple2<String,Integer>> collector) throws Exception { collector.collect(new Tuple2(word , 1 )); } } }
这里的环境获取老夫是提前写了一个单例的,您也可以直接在这里去拿到环境变量。,这里呢,老夫也把简单写的这个单例拿出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class StreamEnv { private final StreamExecutionEnvironment env; private StreamTableEnvironment tableEnv; private Builder builder ; private StreamEnv (Builder builder) { env = StreamExecutionEnvironment.getExecutionEnvironment(); if (builder.RESTART){ env.setRestartStrategy(RestartStrategies.failureRateRestart( 3 , Time.of(5 , TimeUnit.MINUTES), Time.of(10 ,TimeUnit.SECONDS) )); } if (builder.parallelism != null ){ env.setParallelism(builder.parallelism); } env.enableCheckpointing(1000 , CheckpointingMode.EXACTLY_ONCE) ; this .builder = builder ; } public StreamExecutionEnvironment getEnv () { return env; } public StreamTableEnvironment getTableEnv () { if (tableEnv == null ) { tableEnv = StreamTableEnvironment.create(env); } return tableEnv; } public static Builder builder () { return new Builder(); } public static class Builder { private boolean RESTART = false ; private Integer parallelism ; public Builder enableRestart () { this .RESTART = true ; return this ; } public Builder setParallelism (int parallelism) { this .parallelism = parallelism ; return this ; } public StreamEnv finish () { return new StreamEnv(this ) ; } }
程序写好了,我们便在 hostname为hadoop103上开启我们的服务:
随后输入我们的word,可以看到我们的结果:
跟进程序,从环境开始 程序的启动从这句开始:
1 env = StreamExecutionEnvironment.getExecutionEnvironment();
这会返回一个我们Flink的流的执行环境,如果要返回批的执行环境就是如下:
1 env = ExecutionEnvironment.getExecutionEnvironment();
所谓的执行环境是针对整个Flink的程序的执行上下文的,在环境中会存在一些配置,例如,老夫这里设置的是否开启失败重启策略?是否设置多并行度等等,我们可以进去看看这个环境里面给我们哪些信息:
这是可以清除的看到,我们的StreamExecutionEnvironment在流中是一个父类,子类包括 LocalStreamEnvironment,RemoteStreamEnvironment,StreamPlanEnvironment,StreamContextEnvironment. 这张图给出了StreamExecutionEnvironment里面的方法,参数,配置等基本信息,图有点小,建议您自己去看看源码。 对于分布式流处理程序来说,我们在代码中定义了如:flatMap(),keyBy,sum()等,事实是是一种可以理解为声明的方式,旨在告诉我们程序,这里我使用了flatMap()算子,但是真正启动计算的代码不在这里,我们的Flink是一个懒加载过程,所以必须要有:env.execute()
去执行我们的Flink程序。由于我们是在本地运行程序,因此这里会返回一个LocalStreamEnvironment。
算子(Operator)的注册声明 上面老夫使用的算子,第一个就是 flatMap(new WordFlatMapFunction())的,我们跟进去一看便知:
1 2 3 4 5 6 7 8 9 10 public <R> SingleOutputStreamOperator<R> flatMap (FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true ); return flatMap(flatMapper, outType); } public <R> SingleOutputStreamOperator<R> flatMap (FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) { return transform("Flat Map" , outputType, new StreamFlatMap<>(clean(flatMapper))); }
我们可以看到,这里首先是先拿到了一个flatMap算子的输出类型 outType然后又生成了一个Operator,这也符合Flink的流式计算的核心概念了。就是所谓的链式处理。数据流从一输入流一个个传递给Operator进行链式处理,最后到我们的输出。针对数据流的每一次处理就是一次 operator,这里 operator之间还可以组成一个chain来处理。 Flink如何看待用户的处理流程呢?官网有这么一个介绍: 抽象下来就是: 关于这里面的算子,我们看这个图,就可以知道 DataStreamSource -> SingleOutputStreamOperator -> DataStream的关系,以及相关的算子方法。
程序的执行 前面有提到所谓的算子只是一个声明,不代表你声明了就会去执行,Flink的执行与Spark相同,都属于那种懒加载的方式,所以我们需要自己去开启Flink程序的执行。
这里的源码看的有点蒙:看下源码
1 2 3 4 5 6 public JobExecutionResult execute (String jobName) throws Exception { Preconditions.checkNotNull(jobName, "Streaming Job name should not be null." ); return execute(getStreamGraph(jobName)); }
我既然给了一个jobName , 为什么还要去检查这个jobName是否为空呢?纠结
本地模式下的execute方法 这行代码主要做这几件事:
生成StreamGraph,表示流拓扑的类。 它包含为执行建立作业图所需的所有信息。
生成JobGraph,这个图是要交给Flink去生成Task的图
生成一系列的配置
将JobGraph和配置交给Flink集群去运行,本地运行会为我们生成一个最小的集群环境,也就是我们的本地环境,如果不是本地运行的话,还会把JAR文件通过网络发给其他节点;
以本地模式运行的话,可以看到启动过程,如启动性能度量,WEB模块,JobManager,ResourceManager,TaskManager等
启动任务,值得一提的是在启动任务之前,先启动了一个用户类加载器,这个类加载器可以用来做一些在运行时动态加载类的工作。
远程模式下(RemoteEnvironment) 的execute()方法
这里老夫还没有跟进看,就看别人写的吧
远程模式的程序执行更加有趣一点。第一步仍然是获取StreamGraph,然后调用executeRemotely方法进行远程执行。该方法首先创建一个用户代码加载器:
1 ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());
然后创建一系列配置,交给Client对象:
1 2 3 4 5 6 7 8 9 ClusterClient client; try { client = new StandaloneClusterClient(configuration); client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); } } try { return client.run(streamGraph,jarFiles,globalClasspaths,usercodeClassLoader).getJobExecutionResult(); }
client 方法首先生成一个 JobGraph,然后传递给JobClient.关于Client、JobClient、JobManager到底谁管谁,可以看这个: 确切的说,JobClient负责以异步的方式和JobManager通信(Actor是scala的异步模块),具体的通信任务有JobClientActor完成。相对应的,JobManager的通信任务也有一个Actor完成。
1 2 3 4 5 6 7 8 JobListeningContext jobListeningContext = submitJob(actorSystem, config, highAvailabilityServices, jobGraph, timeout, sysoutLogUpdates, classLoader); return awaitJobResult(jobListeningContext);
可以看到,该方法阻塞在awaitJobResult方法上,并最终返回了一个JobListeningContext,透过这个Context可以得到程序运行的状态和结果.
两种环境基本就是这样子的,现在我们的程序执行了execute(),跟进看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public JobExecutionResult execute (StreamGraph streamGraph) throws Exception { JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true ); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0" ); configuration.addAll(this .configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { configuration.setString(RestOptions.BIND_PORT, "0" ); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster" ); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
我们再看一下这个提交JobMaster的方法,跟进去看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public JobExecutionResult executeJobBlocking (JobGraph job) throws JobExecutionException, InterruptedException { Preconditions.checkNotNull(job, "job is null" ); CompletableFuture<JobSubmissionResult> submissionFuture = this .submitJob(job); CompletableFuture jobResultFuture = submissionFuture.thenCompose((ignored) -> { return this .requestJobResult(job.getJobID()); }); JobResult jobResult; try { jobResult = (JobResult)jobResultFuture.get(); } catch (ExecutionException var7) { throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult." , ExceptionUtils.stripExecutionException(var7)); } try { return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException | IOException var6) { throw new JobExecutionException(job.getJobID(), var6); } }
现在找到了这个提交的方法 submitJob(),我们再进去瞅瞅:
1 2 3 4 5 6 7 8 9 10 11 12 13 public CompletableFuture<JobSubmissionResult> submitJob (JobGraph jobGraph) { CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = this .getDispatcherGatewayFuture(); jobGraph.setAllowQueuedScheduling(true ); CompletableFuture<InetSocketAddress> blobServerAddressFuture = this .createBlobServerAddress(dispatcherGatewayFuture); CompletableFuture<Void> jarUploadFuture = this .uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCombine(dispatcherGatewayFuture, (ack, dispatcherGateway) -> { return dispatcherGateway.submitJob(jobGraph, this .rpcTimeout); }).thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply((ignored) -> { return new JobSubmissionResult(jobGraph.getJobID()); }); }
这里的Dispatcher 是一个接收job,然后指派JobMaster去启动任务的类,我们看看他的这个实现类,在本地下启动的是 MiniDispatcher , 在集群上提交任务时,集群上启动的是 StandaloneDispatcher.
这里的Dispatcher启动了一个JobManagerRunner , 委托JobManagerRunner去启动该Job的JobMater。