SQL Is All Your Need: Flink Dynamic SQL

本文最后更新于 2024年1月3日 晚上

实时监控是Flink一个重要且复杂的应用场景,所以一般不会只采用SQL去实现。但本篇文章我们将从一个简单的问题出发,挑战只使用SQL来满足逐渐复杂的需求,希望最终可以再次拓宽SQL的使用边界。

事先准备

  • 我们有日志流如下:
1
2
3
4
5
6
7
8
CREATE TABLE IF NOT EXISTS `dwd_log` (
`time` STRING COMMENT '事件时间',
`key_word` STRING COMMENT '日志标识',
`key1` INT COMMENT '指标',
`key2` STRING COMMENT '维度'
) WITH (
...
);
  • 数据示例如下:
1
{"time":"2023-04-09 15:40:05","key_word":"stdout","key1":5,"key2":"val1"}
  • 我们将根据一些规则从这个日志流中筛选出关注的事件。

阶段一:需求的开始

规则1:日志关键字为stdout每天不同维度key2下获得指标key1次数大于1的事件

  • 这个规则可以用简单的SQL实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT
1 AS `rule_id`,
DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
`key2` AS `key`,
MAX(`time`) AS `alert_time`,
MAP['key1', CAST(COUNT(`key1`) AS DOUBLE)] AS `alert_metrics`
FROM
`dwd_log`
WHERE
`key_word` = 'stdout'
GROUP BY
`key2`,
DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00')
HAVING
COUNT(`key1`) > 1;
  • 所以对这种单个简单的统计规则,用Flink SQL实现会非常方便,但当规则越来越多时呢?

阶段二:需求的增长

规则2:不同日志关键字stdout每小时去重维度key2数量大于2的事件

  • 当多个类似的需求出现时,最简单的方法是再起个作业计算,但我们这里想在一个作业里实现,可以直接将不同规则UNION起来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
SELECT
1 AS `rule_id`,
DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
`key2` AS `key`,
MAX(`time`) AS `alert_time`,
MAP['key1_cnt', CAST(COUNT(`key1`) AS DOUBLE)] AS `alert_metrics`
FROM
`dwd_log`
WHERE
`key_word` = 'stdout'
GROUP BY
`key2`,
DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00')
HAVING
COUNT(`key1`) > 1
UNION ALL
SELECT
2 AS `rule_id`,
DATE_FORMAT(`time`, 'yyyy-MM-dd HH:00:00') AS `window_start`,
`key_word` AS `key`,
MAX(`time`) AS `alert_time`,
MAP['key2_cnt', CAST(COUNT(DISTINCT `key2`) AS DOUBLE)] AS `alert_metrics`
FROM
`dwd_log`
GROUP BY
`key_word`,
DATE_FORMAT(`time`, 'yyyy-MM-dd HH:00:00')
HAVING
COUNT(DISTINCT `key2`) > 2;
  • 虽然这样实现逻辑简单,但涉及重复消费,计算消耗大,且将规则硬编码在代码中,不够灵活。我们需要在达到临界点时之前找到更好的解决方法。

阶段三:需求的爆发

  • 我们看到当类似的需求增长后,将面临以下问题:
    • 重复消费导致的计算资源浪费
    • 合并后代码过长导致无法维护
    • 硬编码导致规则没有灵活调整
  • 所以这时需要重构SQL代码,找到一种不用改变SQL又能支持规则CRUD的方法
    • 仔细思考一下现有的规则1和规则2,发现它们都是由相同的部分组成
      • 过滤规则:对应WHERE条件
      • 分组规则:对应监控对象, 规则1中的key2字段以及规则2中的key_word字段
      • 窗口规则:对应监控周期,规则1中的以及规则2的小时
      • 聚合规则:对应聚合计算,规则1中的COUNT方法以及规则2中的COUNT DISTINCT方法
      • 阈值规则:对应HAVING条件
  • 这时我们可以抽象出一个规则模型,将规则放入另外一张(Paimon)表中维护
1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS `dim_rule` (
`rule_id` INT COMMENT '规则id',
`job_id` STRING COMMENT '处理规则作业id',
`filter` STRING COMMENT '过滤规则',
`key` STRING COMMENT '分组规则',
`window` STRING COMMENT '窗口规则',
`aggregate` ROW<`name` STRING, `input` STRING, `method` STRING> COMMENT '聚合规则',
`threshold` STRING COMMENT '阈值规则',
PRIMARY KEY(`rule_id`) NOT ENFORCED
);
  • 规则示例为:
1
2
3
4
5
6
7
8
9
10
11
12
13
{
"rule_id": 1,
"job_id": "test",
"filter": "key_word=='stdout'", -- 实际为EXPR$0=='stdout'
"key": "key2", -- 实际为EXPR$2
"window": "DAY",
"aggregate": {
"name": "key1_cnt",
"input": "key1", -- 实际为EXPR$1
"method": "COUNT",
},
"threshold_rule": "key1_cnt>1"
}
  • 有了规则的抽象之后,我们就可以拿日志数据去关联规则,然后根据规则去做具体的处理,当然这里就需要引入几个UDF来增强SQL的表达能力(这里我们不去讨论UDF具体实现)
    • dynamic_key(UDF): 根据分组规则取日志中的相应值
    • dynamic_window(UDF):根据窗口规则划分日志到相应窗口,类似之前的DATE_FORMAT函数
    • dynamic_filter(UDF):根据过滤规则或阈值规则的表达式,判断表达式是否满足条件
    • dynamic_agg(UDAF):根据聚合规则中计算聚合值,最终输出指标名:值的映射
  • 重构后的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
SELECT
`rule_id`,
`window_start`,
`key`,
MAX(`time`) AS `alert_time`,
-- 根据聚合规则进行计算
dynamic_agg(`data`, `aggregate`) AS `alert_metrics`
FROM
(
SELECT
`rule_id`,
dynamic_window(`time`, `window`) AS `window_start`,
dynamic_key(`data`, `key`) AS `key`,
`aggregate`,
`threshold`,
`time`,
-- 其中`data`为ROW(`key_word`, `key1`, `key2`),由于规则中是不确定的,需要可以访问到日志中每一个字段
`data`
FROM
`dwd_log` AS `log`
LEFT JOIN
-- Paimon规则维表查询
`dim_rule` FOR SYSTEM_TIME AS OF `log`.`proc_time` AS `rule`
-- 只关联作业相关规则
ON `rule`.`job_id` = 'test'
WHERE
-- 根据规则过滤日志
dynamic_filter(`data`, `filter`) AND `rule_id` IS NOT NULL
) `log_with_rule`
GROUP BY
-- 根据规则分组
`key`, `rule_id`, `window_start`, `threshold`, `aggregate`
HAVING
-- 根据阈值规则判断聚合结果是否超过阈值
dynamic_filter(dynamic_agg(`data`, `aggregate`), `threshold`);

注:在实现过程中发现无法重命名ROW类型中字段,所以在上述规则示例中实际需要指定默认名,即EXPR$0, EXPR$1, EXPR$2。大家也可以思考一下为什么不用MAP类型?

  • 经过这样的重构之后,SQL代码就固定了,所有操作只是针对规则的CRUD。但还会有什么其他问题吗?

阶段四:需求的挑战

  • 上述方法采用维表关联,每来一条数据都需要对维表进行全表扫描,当日志和规则量级增加后,会带来相应的性能问题。虽然可以采用异步、缓存等SQL HINTS缓解,但其实不是根本的解决方案。
  • 幸好Paimon提供了增量读的功能,我们可以将规则表全部读取到Flink中,并可以捕获规则表中的数据变更,这样就和数据源解耦了,不会带来额外的性能压力。这只需改变一行代码就可以实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT
...
FROM
(
SELECT
...
FROM
`dwd_log` AS `log`
LEFT JOIN
-- 修改为常规的左连接
`dim_rule` AS `rule`
ON `rule`.`job_id` = 'test'
...
) `log_with_rule`
...
  • 看起来去利用SQL进行复杂监控的目标达到了,但事实真的是这样吗?

真相

  • 其实我们还会碰见一系列棘手的问题:
    • 报警控制:SQL的实现周期内报警会反复触发报警,如需控制报警次数,只能下游再进行处理
    • 窗口类型:SQL的实现只能在指定周期内实时累积,不能定时轮询(比如每天触发一次)
    • 状态控制:SQL的实现当规则修改时没法细粒度地控制之前累积状态,作业在长期运行后会不堪重负等
  • 但实际工作中,我们大部分碰到的问题其实就是类似阶段一那样确定的规则,都可以利用SQL方便地进行解决。而从长远来看,上层的数据应用会变得更加简单,对于最终用户,所有的数据都可以使用SQL方式进行分析,这就是我理解的SQL Is All Your Need愿景。

总结

  • 在本篇文章中看到了SQL解决复杂问题潜力,但也明白SQL解决问题的局限性,但这并不影响我们的愿景, 而且在随着AI的发展,可以憧憬仅用自然语言处理数据的时代,Keep up with the times ~

本文所有SQL代码可参考:SQL IS ALL Your Need: Flink Dynamic SQL


SQL Is All Your Need: Flink Dynamic SQL
https://syntomic.cn/2023/06/15/SQL-Is-All-Your-Need-Flink-Dynamic-SQL/
作者
syntomic
发布于
2023年6月15日
更新于
2024年1月3日
许可协议