0%

大数据计算引擎之Flink SQL

Flink SQL简单介绍

  • 灵活的,丰富的语法;
  • 能够应用于大部分的计算场景;
  • Flink SQL底层使用的是Apache Calcite框架,将标准的Flink SQL语句解析并转换成底层的算子处理逻辑,并在转换过程中基于语法规则层面进行性能优化。
  • 可以屏蔽底层技术上的细节,能够更加方便且高效的通过SQL语句来构建Flink应用;
  • FlinkSQL在``Table API之上,涵盖了大部分的Table API`功能特性;
  • SQL和``TableAPI`混用,Flink最终会在整体上将代码合并在同一套代码逻辑中;
  • 构建一套``SQL代码可以同时应用在相同数据结构的流式计算和批量计算的场景,不需要用户对SQL`语句做任何调整,最终达到实现批流同一的目的

Flink SQL 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

Table table = tableEnv.fromDataStream(ds, "user, product, amount");
Table result = tableEnv.sqlQuery(
"SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");

tableEnv.registerDataStream("Orders", ds, "user, product, amount");

Table result2 = tableEnv.sqlQuery(
"SELECT product,
amount
FROM Orders
WHERE product
LIKE '%Rubber%'");

TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"product", "amount"};

TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
tableEnv.sqlUpdate(
"INSERT INTO
RubberOrders
SELECT
product,
amount
FROM Orders
WHERE product
LIKE '%Rubber%'");

Operations

Show and Use

OperationDescription
Showshow catalogs
show databases
show tables
Useuse catalog mycatalog
use database

Scan,Projection and Filter

OperationDescription
Scan / Select / AsSELECT * FROM Orders
SELECT a, c AS d FROM Orders
Where / FilterSELECT * FROM Orders WHERE b = 'red'
SELECT * FROM Orders WHERE a % 2 = 0
User-defined Scalar Functions (Scalar UDF)SELECT PRETTY_PRINT(user) FROM Orders

Aggregations

OperationDescription
GroupBy AggregationSELECT a, SUM(b) as d FROM Orders GROUP BY a
GroupBy Window AggregationSELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window aggregationSELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders
SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
DistinctSELECT DISTINCT users FROM Orders
Grouping sets, Rollup, CubeSELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product))
User-defined Aggregate Functions (UDAGG)SELECT MyAggregate(amount) FROM Orders GROUP BY users
HavingSELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50

Joins

OperationDescription
Inner Equi-joinSELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
Outer Equi-joinSELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Time-windowed JoinSELECT *
FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
Expanding arrays into a relationSELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join with Table Function (UDTF)内连接
如果其表函数调用返回空结果,则删除左(外)表的一行.
SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
外连接
如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。
SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
Join with Temporal Table Function时态表是跟踪随时间变化的表。假设Rates是一个时态表函数,可以用SQL表示连接
SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency
Join with Temporal Table时态表是跟踪随时间变化的表。时态表提供对特定时间点的时态表版本的访问。
仅支持具有处理时间时态表的内部和左侧连接。
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency

Set Operations

OperationDescription
UnionSELECT *
FROM (
( SELECT user FROM Orders WHERE a % 2 = 0)
UNION
(SELECT user FROM Orders WHERE b = 0)<br> )
UnionAllSELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
UNION ALL
(SELECT user FROM Orders WHERE b = 0)<br> )
Intersect / ExceptSELECT *
FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
INTERSECT(SELECT user FROM Orders WHERE b = 0)<br> )

SELECT * FROM (
(SELECT user FROM Orders WHERE a % 2 = 0)
EXCEPT
(SELECT user FROM Orders WHERE b = 0)
)
In
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts )
ExistsSELECT user, amount
FROM Orders
WHERE product EXISTS (
SELECT product FROM NewProducts
)

OrderBy & Limit

OperationDescription
Order BySELECT * FROM Orders ORDER BY orderTime
LimitSELECT * FROM Orders ORDER BY orderTime LIMIT 3

Top-N

1
2
3
4
5
6
7
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// ingest a DataStream from an external source
DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...);
// register the DataStream as table "ShopSales"
tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");

// select top-5 products per category which have the maximum sales.
Table result1 = tableEnv.sqlQuery(
"SELECT * " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" +
" FROM ShopSales)" +
"WHERE row_num <= 5");

Deduplication(重复数据删除)

1
2
3
4
5
6
7
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1

e.g.

1
2
3
4
5
6
7
8
tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime");
Table result1 = tableEnv.sqlQuery(
"SELECT order_id, user, product, number " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" +
" FROM Orders)" +
"WHERE row_num = 1");

Insert

OperationDescription
Insert IntoInsert Into 语句只能被应用在SqlUpdate方法中,用于完成对Table中数据的输出
INSERT INTO OutputTable SELECT users, tag FROM Orders

自定义函数

Scalar Function(标量函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HashCode extends ScalarFunction {
private int factor = 12;

public HashCode(int factor) {
this.factor = factor;
}

public int eval(String s) {
return s.hashCode() * factor;
}
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// 对方法在环境中进行注册
tableEnv.registerFunction("hashCode", new HashCode(10));

// Table 使用自定义函数
myTable.select("string, string.hashCode(), hashCode(string)");

// SQL 使用自定义函数
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

*Note : * 在自定义标量函数过程中,函数返回值类型必须为标量值。

1
2
3
4
5
6
7
8
9
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}

public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
}

*Note : * 对于不支持的输出结果类型,可以通过实现ScalarFunction接口中的getResultType对输出结果数据类型的转换。

Table Function

Scalar Function不同,Table Function将一个或多个标量字段作为输入参数,且经过计算和处理后返回的是任意数量的记录,不再是一个单独的一个标量指标,且返回结果中可以含有一列或多列指标。从形式上看更像一个Table结构数据。

定义tableFunction需要继承org.apache.flink.table.functions包中的TableFunction类,并实现类中的evaluation方法,且所有的自定义函数计算逻辑均在该方法中定义。需要注意的是,方法必须声明为public且名称必须定义为eval,在一个TableFunction中,可以实现evaluation的重载。在使用TableFunction前,在TableEnvironment中注册Table Function,然后结合LATERAL TABLE关键字使用,根据句尾是否增加ON TRUE关键字来区分是join还是leftOuterJoin操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";

public Split(String separator) {
this.separator = separator;
}

public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
Table myTable = ... // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.joinLateral("split(a) as (word, length)")
.select("a, word, length");
myTable.leftOuterJoinLateral("split(a) as (word, length)")
.select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");

// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CustomTypeSplit extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, s.length());
collect(row);
}
}

@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING(), Types.INT());
}
}

Aggregation Function

`Flink Table API`中提供了`User-Defined Aggregate Functions(UDAGGS)`,其主要功能是将一行或多行数据进行聚合然后输出一个标量值,例如在数据集中根据`Key`求取指定值`Value`的最大值或最小值。这个玩意的定义很麻烦......
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}

/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}

@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}

public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}

public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}

public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}

public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
这是打赏的地方...
---------Thanks for your attention---------