0%

聊聊Flink中自定义的DataSink的那些事(1)JDBCInputFormat篇

笔者之前也是没想过自己会重写JDBCInputformat类的,笔者在看这个JDBCInputFormat的时候,其实官方写的就挺好的了,但是感觉在应对一些数据量大的场景下会有点不适用。至少是不适用笔者的环境的。所以只好重写一个JDBCInputformat的连接类了。至少这个连接要加一个连接池吧,这已经是最低要求了。

对JDBCInputFormat的阅读来自于:
聊聊flink jdbc的ParameterValuesProvider
作者:go4it
链接:https://juejin.im/post/5cbeaf665188250a9c355f49
来源:掘金

阅读JDBCInputFormat

首先,我们自然是要阅读一下这个JDBCInputFormat这里面的方法,以及和其他类的依赖关系的。我们可以看这张图,很清楚的看到依赖关系以及方法了。所以其他的也就不必要多说了。直接干就完事了。我们把这个里面的方法。涉及到的类简单的看一遍

JDBCInputFormat.java

JDBCInputFormat.png

  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口

  • createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1

  • getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics

  • openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接

  • open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet

NumericBetweenParametersProvider.java

NumericBetweenParametersProvider.png

NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值。

InputSplit.java

InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;

GenericInputSplit.java

GenericInputSplit.png
GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;

InputSplitAssigner.java

InputSplitAssigner.png

  • InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;

  • InputSplitAssigner接口定义了returnInputSplit方法,其方法接收两个参数分别是splits集合及taskId,该方法主要用于如果任务无法处理拆分,则将拆分返回给分配器;

DefaultInputSplitAssigner.java

DefaultInputSplitAssigner.png

  • DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素;

InputFormatSourceFunction.java

InputFormatSourceFunction.png

  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;

InputSplitProvider.java

  • InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;

RpcInputSplitProvider.java

RpcInputSplitProvider.png

  • RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;

JobMaster.requestNextInputSplit

  • JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

总结

  1. ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider

  2. GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的;NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值

  3. JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口;createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1;getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics;openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接;open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet

  4. InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素

  5. InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

  6. 看完上面的内容,我已经放弃了,再见各位。这就是一个源码的阅读而已.

这是打赏的地方...

本文标题:聊聊Flink中自定义的DataSink的那些事(1)JDBCInputFormat篇

文章作者:Mr.Sun

发布时间:2019年12月04日 - 15:18:26

最后更新:2020年06月15日 - 10:05:35

原始链接:http://www.blog.sun-iot.xyz/posts/6e31bc0

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

---------Thanks for your attention---------