SQL Is All Your Need: Flink SQL UDF

本文最后更新于 2024年2月16日 下午

随着数据处理的逻辑变得越来越复杂,编写的SQL也会变得越加复杂,有时甚至会感觉SQL力不从心。这个时候就需要扩展SQL的表达能力,而UDF(用户自定义函数)就是这样一种扩张开发的机制,拓展系统的内置函数,实现自定义逻辑。本编文章我们就从具体场景出发,使用各种Flink UDF去优化或解决相关问题。

事前准备

  • UDF大致有以下几种
    • 数值函数(UDF): 将标量值转换成一个新标量值,如:DATE_FORMAT
    • 表值函数(UDTF): 将标量值转换成新的行数据, 如:EXPLODE
    • 聚合函数(UDAF):将多行数据里的标量值转换为成一个新标量值, 如:SUM
  • 不同计算引擎有相应的实现方法,本编文章我们只考虑Flink自定义UDF
    • 继承相应类:注意输出输入类型推导
    • 实现相应方法
    • 注册使用

注:文中不会给出详细代码,只会对一些重要的地方给予说明,详细实现可到代码库中查看。

ETL

文章SQL Is All Your Need: Flink SQL中我们提出了一个问题,如何简化ETL中重复的SQL代码,现在我们就用UDTF来实现这一目的。

1
2
3
4
5
6
7
8
9
INSERT INTO `dwd_log`
SELECT
REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 1) AS `time`,
REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 2) AS `key_word`,

JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key1' RETURNING INT) AS `key1`,
JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key2') AS `key2`
FROM
`ods_log`
  • 原因:上述SQL看起来冗长最重要的原因是需要先将每个字段用正则提取,然后再对字段进行相同JSON解析操作,导致了很多重复代码。
  • 解决:可以在Flink SQL中编写UDTF,与常规的标量函数只能返回一个值不同,它可以返回任意多行,且每一行可以包含多列。通过LATERAL算子将外表(算子左侧)的每一行跟表值函数返回的所有行(算子右侧)进行笛卡尔积,这样就一次性提取多个字段,那我们的代码就能简化很多:
1
2
3
4
5
6
7
8
CREATE TEMPORARY FUNCTION IF NOT EXISTS q_regex_extract AS 'cn.syntomic.qflink.sql.udf.table.QRegexExtract' LANGUAGE JAVA;

SELECT
`time`,
`key_word`,
`json`
FROM
`ods_log`, LATERAL TABLE(q_regex_extract(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 'time', 'key_word', 'json'))
  • 同样地,我们利用hive module使用hive函数一次性进行JSON解析
1
2
3
4
5
6
7
8
9
10
11
12
13
LOAD MODULE hive WITH ('hive-version' = '3.1.3');

SELECT
`time`,
`key_word`,
-- json_tuple return string type
CAST(`key1` AS INT) AS `key1`,
`key2`
FROM
(
-- above sql
...
) a, LATERAL TABLE(json_tuple(`json`, 'key1', 'key2')) AS b(`key1`, `key2`);
  • 这样我们就利用UDTF,减少了SQL中重复代码,形成统一的清洗逻辑,同时避免Flink重复计算的问题,对比之前的实现就看起来逻辑更加清晰,也更加简洁~

注: 我们再自定义函数中q_regex_extract没有别名就可以直接相应字段, 这是因为我们自定义了类型推导, 而在函数json_tuple中就必须显示别名相应字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(
callContext -> {
List<Field> fields = new ArrayList<>(argsLen - 2);

for (int i = 2; i < argsLen; i++) {
// use literal parameter as field name
String fieldName =
callContext
.getArgumentValue(i, String.class)
.orElse("f" + (i - 2));
fields.add(i - 2, DataTypes.FIELD(fieldName, DataTypes.STRING()));
}

return Optional.of(DataTypes.ROW(fields.toArray(new Field[0])));
})
.build();
}

Analysis

文章SQL Is All Your Need: Flink SQL中我们计算了每分钟滑动窗口的UV。现在我们把问题变复杂一点:每分钟计算一次当天截至到目前的累积UV

  • Flink SQL在1.13之后提供了累积窗口的计算方式:
1
2
3
4
5
6
7
8
SELECT
DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:ss') AS `window_start`,
DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:ss') AS `window_end`,
`key_word` AS `dim`,
COUNT(DISTINCT `key2`) AS `metric`
FROM
TABLE(CUMULATE(TABLE `dwd_log`, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES, INTERVAL '1' DAYS))
GROUP BY `key_word`, `window_start`, `window_end`;
  • 但如果需要我们自己实现的时候,似乎比较难以实现?我们这里只考虑离线的情况,因为Flink(1.15)离线计算时暂不支持累计窗口去重计算。
  • 一种比较直接的想法是首先计算每天所有的时间窗口,然后与小于当前结束窗口的数据JOIN, 计算出相应时间窗口的基数。但这样计算会导致数据膨胀严重,当数据量大时会导致数据倾斜等问题。可以采取下列方法进行优化:
    • 累积窗口计算只需要当天分组下最早时间的记录,这样可以大大减少数据量。
    • 调整相应资源以及参数实现自适应执行。
  • 其实大数据场景中有很多基数估算的方法,这里我们可以引入HLL(HyperLogLog) —— 基数统计的概率算法,用另外一种方法实现
    • 对HLL感兴趣的同学可以参考文章神奇的HyperLogLog,以及线上演示Demo
    • HLL可以在1.2kb内存下估算高达1亿个元素,而只有2%的误差!
    • 我们可以使用HLL数据结构这样去计算当天每分钟的累积人数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_agg AS 'cn.syntomic.qflink.sql.udf.aggregate.HLLAggregate' LANGUAGE JAVA;
CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_cardinality AS 'cn.syntomic.qflink.sql.udf.scalar.HLLCardinality' LANGUAGE JAVA;

SELECT
DATE_FORMAT(`min_time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
FROM_UNIXTIME(UNIX_TIMESTAMP(`min_time`) + 60) AS `window_end`,
`key_word` AS `dim`,
-- 按照每分钟时间排序,合并第一行到当前行每分钟的HLL结构,并估算其基数
hll_cardinality(hll_agg(min_hll) OVER (PARTITION BY `key_word` ORDER BY `min_time`)) AS `metric`
FROM
(
-- 聚合每分钟数据形成HLL数据结构
SELECT
DATE_FORMAT(`time`, 'yyyy-MM-dd HH:mm:00') AS `min_time`,
`key_word`,
hll_agg(`key2`) AS min_hll
FROM
`dwd_log`
GROUP BY
DATE_FORMAT(`time`, 'yyyy-MM-dd HH:mm:00'),
`key_word`
) a;
  • 在Flink中HLL不是预定义的数据结构, 所以累加器中需要将其视为RAW数据格式,利用KYRO序列化
1
2
3
4
5
public class HLLBuffer {

@DataTypeHint(allowRawGlobally = HintFlag.TRUE)
public HLL hll = new HLL(11, 5);
}
  • 这样我们就利用udf引入新的数据结构,扩展了SQL的表达能力~

Python UDF

有时我们需要结合Python生态去实现动态执行、模型预测等功能,这个时候就可以引入Python UDF解决。这里我们以经典的鸢尾花机器学习分类为例。

  • 原理
    • 进程模式:Python函数和Java算子之间采用Grpc服务通信
    • 线程模式:Python函数和Java算子运行在同一个进程,利用FFI通信
      • 这样可以减少进程间序列化的开销,提升性能
  • 实现:
    • 实现相应Python类,导入模型进行预测
1
2
3
4
5
6
7
8
9
10
class DemoUDF(ScalarFunction):

def open(self, function_context: FunctionContext):
self.model = joblib.load(path)

def eval(self, value):
return self.model.predict(np.array(value).reshape(1, -1)).tolist()[0]


demo_udf = udf(DemoUDF(), result_type=DataTypes.INT())
  • 注册Python UDF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SET 'python.files'='./qflink-python/src/main/python/q-pyflink/udf';
-- 保证python算子获得足够资源
SET 'pipeline.operator-chaining' = 'false';
SET 'python.execution-mode'='thread';

CREATE TEMPORARY FUNCTION IF NOT EXISTS demo_udf AS 'scalar.demo_udf.demo_udf' LANGUAGE PYTHON;

CREATE TABLE IF NOT EXISTS `source` (
`sepal_length` FLOAT,
`sepal_width` FLOAT,
`petal_length` FLOAT,
`petal_width` FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);

SELECT
demo_udf(ARRAY[`sepal_length`, `sepal_width`, `petal_length`, `petal_width`]) AS `classifier`
FROM
`source`;

注: 需提前创建Python虚拟环境,安装pyflink依赖且激活;若Python算子是CPU密集型任务,则需调整TM CPU个数

Summary

大数据技术发展日新月异,但_SQL NEVER DIE_ !本文讨论了如何利用UDF来来提升SQL的表达能力,希望可以提升你对SQL Is ALL Your Need的信心。但技术没有银弹,SQL不可能解决所有的问题。不管是SQL还是底层API,其实都是解决问题的方式,需要根据具体问题采用合适的工具,甚至可以结合SQL的通用性以及底层API的灵活性来优雅地解决问题~

本文所有SQL代码可参考:SQL IS ALL Your Need: Flink SQL UDF


SQL Is All Your Need: Flink SQL UDF
https://syntomic.cn/2023/05/11/SQL-Is-All-Your-Need-Flink-SQL-UDF/
作者
syntomic
发布于
2023年5月11日
更新于
2024年2月16日
许可协议