Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。那这里我们拿 Over 聚合 与 窗口聚合 做一个对比,其之间的最大不同之处在于:
GROUP BY是针对每个group产生一个aggregate value,而OVER则是对每一个row产生一个aggregate value
Over 聚合的语法总结如下:
SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...
其中:
可以在一个SELECT子句中定义多个OVER窗口聚合。但是,对于流查询,由于当前的限制,所有聚合的OVER窗口必须相同。
数据源
-- 源数据表
CREATE TABLE orders (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2'
);-- 结果表
CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 5 行数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM orders;
flink web控制台查看结果
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
INSERT INTO sink_table
SELECT product, order_time, amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是一个 product 的最近 1 小时的数据RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM orders;
Flink SQL 支持了一种简化写法,如下案例:
SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
参考:
hive开窗函数-- over()
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/over-agg/