Flink规则引擎

本文最后更新于 2024年1月1日 晚上

随着业务发展,对风控能力的要求会越来越高,比如丰富的事件类型处理、不同的统计方式计算、动态的规则配置支持等。本篇文章我们就来讨论如何利用Flink构建一个规则引擎,去解决这些问题,来支持风控平台的建设。

风控业务

  • 类型
    • 事先风控:提前辨识异常,避免风险事件的发生。
    • 事中风控:实时识别异常,减少风险事件的影响。
    • 事后风控:总结分析异常,防止类似事件再次产生。
  • 方法
    • 基于规则
      • 统计规则:例如5分钟以内访问次数大于100次
      • 序列规则:例如用户点击、加入购物车、删除事件序列
    • 基于算法
  • 本篇文章只考虑利用统计规则做事中的风控:根据规则将实时数据源中异常事件筛选出来

文章所有代码见:RuleEngineJob

设计

  • 利用广播流广播规则到各个算子上,然后数据遍历相应规则进行处理输出。原始思路可见Flink官方博客

Flink作业

  • 利用参数划分作业处理规则的范围,一个作业只处理相同作业IDjob.id的规则。
  • 由于Flink序列化效率的差别, 所以一个作业只处理相同Schema的数据,这样的就能统一采用Row数据类型进行高效序列化。
1
2
3
4
5
6
7
8
9
10
{
"type": "record",
"name": "default",
"fields": [
{
"name": "rule_id",
"type": "int"
}
]
}
  • 同时如果原始数据源量级比较大,我们可以先统一合并读取拆分出关心的数据,减少公共层压力, 所以这里也顺便提供了纯ETL的功能,通过作业参数agg.enable = false设置。

规则

  • 我们提供下列配置项
    • 过滤条件: 根据计算表达式过滤满足条件的数据
    • 清洗条件
      • 正则解析
      • JSONPath解析
      • 字段表达式
    • 分组条件:根据关注对象进行分组计算
    • 窗口条件
      • 窗口类型:滚动/滑动/累积
      • 窗口触发:单次/批次
      • 窗口大小及偏移
    • 聚合条件
      • 聚合过滤:计算表达式
      • 聚合方法:SUM/COUNT/COUNT DISTINCT等
    • 阈值条件:利用聚合指标的计算表达式判断
    • 以及规则元数据
      • 规则ID:主键
      • 规则状态:控制规则生效状态
      • 作业ID: 匹配相应Flink作业
  • 例子:将数据进行过滤、解析、字段映射、窗口聚合计算、阈值判断的整体流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
{
"rule_id": 100,
"rule_state": "ACTIVE",
"job_id": "test",
"filter": {
"expr": "string.contains(raw, 'stdout')",
"params": [
"raw"
]
},
"flat_map": {
"pattern": "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (.*?) (.*)",
"normal_fields": [
{
"name": "time",
"type": "STRING",
"default": null
},
{
"name": "key_word",
"type": "STRING",
"default": null
},
{
"name": "",
"type": "JSON",
"json_paths": [
{
"name": "key1",
"expr": "$.key1",
"type": "INT",
"default": 0
},
{
"name": "key2",
"expr": "$.key2",
"type": "STRING",
"default": null
}
],
"default": null
}
],
"mapping_fields": [
{
"name": "is_odd",
"type": "BOOLEAN",
"expr": "key1 % 2 == 1",
"default": null
}
]
},
"keys": [
"is_odd"
],
"window": {
"type": "TUMBLE",
"trigger": "SINGLE",
"offset": 0,
"size": 86400000,
"step": 0
},
"aggregates": [
{
"name": "val_cnt",
"inputs": [
"key2"
],
"method": "COUNT_DISTINCT"
}
],
"threshold": {
"expr": "val_cnt > 1",
"params": [
"val_cnt"
]
}
}

实现

Connector

  • Source:通过自定义RowDeserializationSchema根据传入的Avro Schema进行数据规范化
  • Sink:根据TopicSelector(Kafka)根据关键字拆分到不同Topic

ETL算子

  • 规则流:根据规则状态更新广播状态
  • 数据流:遍历所有广播状态中规则进行处理
    • 引入高性能、轻量级Aviator表达式引擎提升表达能力
    • 如果下游需要聚合,根据rule_id + 按照keys字段取值进行分组。这样下游就相当于固定逻辑处理,减少代码复杂度。

注1:因为我们这里可以得到规则相应的聚合条件,所以可以在ETL算子中做预聚合减少下游数据量,提升吞吐量。

注2: 遍历规则会导致数据重复, 当规则过多时可能会产生性能问题。这里可以先合并相同过滤+窗口+聚合条件的规则, 减少处理压力。

AGG算子

  • Flink原生窗口算子不支持动态变更,所以我们需要设计重新窗口算子。
  • 通过阅读Flink源码可知窗口实现过程:
    • 通过WindowAssigner确定消息所在的窗口(可能属于多个窗口)
    • 将数据根据AggregateFunction聚合到对应窗口的状态中
    • 根据Trigger确定是否应该触发窗口结果的计算,如果使用 InternalWindowFunction 对窗口进行处理
    • 注册EventTimeTimer定时器,进行窗口触发计算及结束时清理窗口状态
    • 如果数据延迟到达,提交到SideOutput
  • 所以只要我们先从广播状态中根据当前分组拿到相应规则,就可以模拟窗口算子的逻辑,实现窗口的动态配置~
  • 这里需要注意一些问题:
    1. 窗口触发:因为需要处理数据和规则的双流输入,而Flink的Watermark是取得双流中最小的Watermark,所以这里我们需要定义规则流的Watermark为周期性触发的Long.MAX_VALUE,这样才不会影响数据流正常窗口触发计算。
    2. 状态清理:但规则移除后,肯定希望清理相应规则下的所有累积状态,要不然之后肯定会OOM。这里我们在移除广播流规则时,可以根据规则ID拿到规则相关的所有聚合状态进行删除。删除时也要注意并发问题,可以采用先复制相应键进行避免。
    3. 聚合计算:因为DataStream API中窗口是利用数据复制实现的,长时间周期,短步长的窗口类型会导致严重的性能问题。这里可以借鉴SQL API中Window slice实现。

注: 这里采用AbstractStreamOperator实现而不是标准的KeyedBroadcastProcessFunction实现是因为需要得到窗口触发时对应的窗口对象,需要利用底层状态的Namespace进行判断。

测试

  • 作为一个通用型的平台作业,所有需要编写相应的单元测试、算子测试、作业测试保证代码质量,具体操作可见Flink测试
    • 这里我们将source 和 sink 设置成可插拔的,可以在不改动代码的条件下实现作业测试。

业界实现

  • 社区提出动态CEP的提案, 希望提供动态CEP规则的支持,但暂时还没实现。
  • 不过业界有对应实现:
  • 我们提供的思路也可以去实现相应的动态CEP,不过感觉CEP的配置过于复杂,需要更好地设计前端平台,降低使用门槛是关键。

总结

本篇文章我们详细讨论了如何利用Flink构建规则引擎支持风控平台的建设,得到了一些构建复杂数据处理应用的经验,也更加深入理解了Flink处理数据的原理,向知其所以然迈向了坚实的一步~


Flink规则引擎
https://syntomic.cn/2023/07/15/Flink规则引擎/
作者
syntomic
发布于
2023年7月15日
更新于
2024年1月1日
许可协议