SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window aggregation
SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
Distinct
SELECT DISTINCT users FROM Orders
Grouping sets, Rollup, Cube
SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product))
User-defined Aggregate Functions (UDAGG)
SELECT MyAggregate(amount) FROM Orders GROUP BY users
Having
SELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50
Joins
Operation
Description
Inner Equi-join
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
Outer Equi-join
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Time-windowed Join
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
Expanding arrays into a relation
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join with Table Function (UDTF)
内连接 如果其表函数调用返回空结果,则删除左(外)表的一行. SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag 外连接 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE
Join with Temporal Table Function
时态表是跟踪随时间变化的表。假设Rates是一个时态表函数,可以用SQL表示连接 SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency
Join with Temporal Table
时态表是跟踪随时间变化的表。时态表提供对特定时间点的时态表版本的访问。 仅支持具有处理时间时态表的内部和左侧连接。 SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
Set Operations
Operation
Description
Union
SELECT * FROM ( ( SELECT user FROM Orders WHERE a % 2 = 0) UNION (SELECT user FROM Orders WHERE b = 0)<br> )
UnionAll
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) UNION ALL (SELECT user FROM Orders WHERE b = 0)<br> )
Intersect / Except
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT(SELECT user FROM Orders WHERE b = 0)<br> )
SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT (SELECT user FROM Orders WHERE b = 0) )
In
SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts )
Exists
SELECT user, amount FROM Orders WHERE product EXISTS ( SELECT product FROM NewProducts )
OrderBy & Limit
Operation
Description
Order By
SELECT * FROM Orders ORDER BY orderTime
Limit
SELECT * FROM Orders ORDER BY orderTime LIMIT 3
Top-N
1 2 3 4 5 6 7
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITIONBY col1[, col2...]] ORDERBY col1 [asc|desc][, col2 [asc|desc]...]) ASrownum FROM table_name) WHERErownum <= N [AND conditions]
// ingest a DataStream from an external source DataStream<Tuple3<String, String, String, Long>> ds = env.addSource(...); // register the DataStream as table "ShopSales" tableEnv.registerDataStream("ShopSales", ds, "product_id, category, product_name, sales");
// select top-5 products per category which have the maximum sales. Table result1 = tableEnv.sqlQuery( "SELECT * " + "FROM (" + " SELECT *," + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num" + " FROM ShopSales)" + "WHERE row_num <= 5");
Deduplication(重复数据删除)
1 2 3 4 5 6 7
SELECT [column_list] FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITIONBY col1[, col2...]] ORDERBY time_attr [asc|desc]) ASrownum FROM table_name) WHERErownum = 1
e.g.
1 2 3 4 5 6 7 8
tableEnv.registerDataStream("Orders", ds, "order_id, user, product, number, proctime.proctime"); Table result1 = tableEnv.sqlQuery( "SELECT order_id, user, product, number " + "FROM (" + " SELECT *," + " ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num" + " FROM Orders)" + "WHERE row_num = 1");
Insert
Operation
Description
Insert Into
Insert Into 语句只能被应用在SqlUpdate方法中,用于完成对Table中数据的输出 INSERT INTO OutputTable SELECT users, tag FROM Orders
publicvoideval(String str){ for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } }
// Register the function. tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.joinLateral("split(a) as (word, length)") .select("a, word, length"); myTable.leftOuterJoinLateral("split(a) as (word, length)") .select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
publicclassCustomTypeSplitextendsTableFunction<Row> { publicvoideval(String str){ for (String s : str.split(" ")) { Row row = new Row(2); row.setField(0, s); row.setField(1, s.length()); collect(row); } }
@Override public TypeInformation<Row> getResultType(){ return Types.ROW(Types.STRING(), Types.INT()); } }