笔者之前也是没想过自己会重写JDBCInputformat类的,笔者在看这个JDBCInputFormat的时候,其实官方写的就挺好的了,但是感觉在应对一些数据量大的场景下会有点不适用。至少是不适用笔者的环境的。所以只好重写一个JDBCInputformat的连接类了。至少这个连接要加一个连接池吧,这已经是最低要求了。
对JDBCInputFormat的阅读来自于:
聊聊flink jdbc的ParameterValuesProvider
作者:go4it
链接:https://juejin.im/post/5cbeaf665188250a9c355f49
来源:掘金
阅读JDBCInputFormat
首先,我们自然是要阅读一下这个JDBCInputFormat这里面的方法,以及和其他类的依赖关系的。我们可以看这张图,很清楚的看到依赖关系以及方法了。所以其他的也就不必要多说了。直接干就完事了。我们把这个里面的方法。涉及到的类简单的看一遍
JDBCInputFormat.java
JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口
createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1
getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics
openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接
open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet
NumericBetweenParametersProvider.java
NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值。
InputSplit.java
InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;
GenericInputSplit.java
GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;
InputSplitAssigner.java
InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;
InputSplitAssigner接口定义了returnInputSplit方法,其方法接收两个参数分别是splits集合及taskId,该方法主要用于如果任务无法处理拆分,则将拆分返回给分配器;
DefaultInputSplitAssigner.java
- DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素;
InputFormatSourceFunction.java
- InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;
InputSplitProvider.java
- InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;
RpcInputSplitProvider.java
- RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;
JobMaster.requestNextInputSplit
- JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider
总结
ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider
GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的;NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值
JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口;createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1;getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics;openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接;open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet
InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素
InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider
看完上面的内容,我已经放弃了,再见各位。这就是一个源码的阅读而已.