随着数据处理的逻辑变得越来越复杂,编写的SQL也会变得越加复杂,有时甚至会感觉SQL力不从心。这个时候就需要扩展SQL的表达能力,而UDF(用户自定义函数)就是这样一种扩张开发的机制,拓展系统的内置函数,实现自定义逻辑。本编文章我们就从具体场景出发,使用各种Flink UDF去优化或解决相关问题。
事前准备 UDF大致有以下几种数值函数(UDF): 将标量值转换成一个新标量值,如:DATE_FORMAT
表值函数(UDTF): 将标量值转换成新的行数据, 如:EXPLODE
聚合函数(UDAF):将多行数据里的标量值转换为成一个新标量值, 如:SUM
不同计算引擎有相应的实现方法,本编文章我们只考虑Flink自定义UDF 继承相应类:注意输出输入类型推导 实现相应方法 注册使用 注:文中不会给出详细代码,只会对一些重要的地方给予说明,详细实现可到代码库 中查看。
ETL 文章SQL Is All Your Need: Flink SQL 中我们提出了一个问题,如何简化ETL中重复的SQL代码,现在我们就用UDTF来实现这一目的。
1 2 3 4 5 6 7 8 9 INSERT INTO `dwd_log`SELECT REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)' , 1 ) AS `time `, REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)' , 2 ) AS `key_word`, JSON_VALUE (REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)' , 3 ), '$.key1' RETURNING INT ) AS `key1`, JSON_VALUE (REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)' , 3 ), '$.key2' ) AS `key2`FROM `ods_log`
原因:上述SQL看起来冗长最重要的原因是需要先将每个字段用正则提取,然后再对字段进行相同JSON解析操作,导致了很多重复代码。 解决:可以在Flink SQL中编写UDTF,与常规的标量函数只能返回一个值不同,它可以返回任意多行,且每一行可以包含多列。通过LATERAL算子将外表(算子左侧)的每一行跟表值函数返回的所有行(算子右侧)进行笛卡尔积,这样就一次性提取多个字段,那我们的代码就能简化很多: 1 2 3 4 5 6 7 8 CREATE TEMPORARY FUNCTION IF NOT EXISTS q_regex_extract AS 'cn.syntomic.qflink.sql.udf.table.QRegexExtract' LANGUAGE JAVA;SELECT `time `, `key_word`, `json`FROM `ods_log`, LATERAL TABLE (q_regex_extract(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)' , 'time' , 'key_word' , 'json' ))
1 2 3 4 5 6 7 8 9 10 11 12 13 LOAD MODULE hive WITH ('hive-version' = '3.1.3' );SELECT `time `, `key_word`, CAST (`key1` AS INT ) AS `key1`, `key2`FROM ( ... ) a, LATERAL TABLE (json_tuple(`json`, 'key1' , 'key2' )) AS b(`key1`, `key2`);
这样我们就利用UDTF,减少了SQL中重复代码,形成统一的清洗逻辑,同时避免Flink重复计算的问题 ,对比之前的实现就看起来逻辑更加清晰,也更加简洁~ 注: 我们再自定义函数中q_regex_extract
没有别名就可以直接相应字段, 这是因为我们自定义了类型推导, 而在函数json_tuple
中就必须显示别名相应字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public TypeInference getTypeInference (DataTypeFactory typeFactory) { return TypeInference.newBuilder() .outputTypeStrategy( callContext -> { List<Field> fields = new ArrayList <>(argsLen - 2 ); for (int i = 2 ; i < argsLen; i++) { String fieldName = callContext .getArgumentValue(i, String.class) .orElse("f" + (i - 2 )); fields.add(i - 2 , DataTypes.FIELD(fieldName, DataTypes.STRING())); } return Optional.of(DataTypes.ROW(fields.toArray(new Field [0 ]))); }) .build(); }
Analysis 文章SQL Is All Your Need: Flink SQL 中我们计算了每分钟滑动窗口的UV。现在我们把问题变复杂一点:每分钟计算一次当天截至到目前的累积UV
Flink SQL在1.13之后提供了累积窗口的计算方式: 1 2 3 4 5 6 7 8 SELECT DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:ss' ) AS `window_start`, DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:ss' ) AS `window_end`, `key_word` AS `dim`, COUNT (DISTINCT `key2`) AS `metric`FROM TABLE (CUMULATE(TABLE `dwd_log`, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES, INTERVAL '1' DAYS))GROUP BY `key_word`, `window_start`, `window_end`;
但如果需要我们自己实现的时候,似乎比较难以实现?我们这里只考虑离线的情况,因为Flink(1.15)离线计算时暂不支持累计窗口去重计算。 一种比较直接的想法是首先计算每天所有的时间窗口,然后与小于当前结束窗口的数据JOIN, 计算出相应时间窗口的基数。但这样计算会导致数据膨胀严重,当数据量大时会导致数据倾斜等问题。可以采取下列方法进行优化:累积窗口计算只需要当天分组下最早时间的记录,这样可以大大减少数据量。 调整相应资源以及参数实现自适应执行。 其实大数据场景中有很多基数估算的方法,这里我们可以引入HLL(HyperLogLog) —— 基数统计的概率算法,用另外一种方法实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_agg AS 'cn.syntomic.qflink.sql.udf.aggregate.HLLAggregate' LANGUAGE JAVA;CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_cardinality AS 'cn.syntomic.qflink.sql.udf.scalar.HLLCardinality' LANGUAGE JAVA;SELECT DATE_FORMAT(`min_time`, 'yyyy-MM-dd 00:00:00' ) AS `window_start`, FROM_UNIXTIME(UNIX_TIMESTAMP(`min_time`) + 60 ) AS `window_end`, `key_word` AS `dim`, hll_cardinality(hll_agg(min_hll) OVER (PARTITION BY `key_word` ORDER BY `min_time`)) AS `metric`FROM ( SELECT DATE_FORMAT(`time `, 'yyyy-MM-dd HH:mm:00' ) AS `min_time`, `key_word`, hll_agg(`key2`) AS min_hll FROM `dwd_log` GROUP BY DATE_FORMAT(`time `, 'yyyy-MM-dd HH:mm:00' ), `key_word` ) a;
在Flink中HLL不是预定义的数据结构, 所以累加器中需要将其视为RAW
数据格式,利用KYRO
序列化 1 2 3 4 5 public class HLLBuffer { @DataTypeHint(allowRawGlobally = HintFlag.TRUE) public HLL hll = new HLL (11 , 5 ); }
这样我们就利用udf引入新的数据结构,扩展了SQL的表达能力~ Python UDF 有时我们需要结合Python生态去实现动态执行、模型预测等功能,这个时候就可以引入Python UDF解决。这里我们以经典的鸢尾花机器学习分类为例。
原理进程模式:Python函数和Java算子之间采用Grpc服务通信 线程模式:Python函数和Java算子运行在同一个进程,利用FFI通信 实现: 1 2 3 4 5 6 7 8 9 10 class DemoUDF (ScalarFunction ): def open (self, function_context: FunctionContext ): self.model = joblib.load(path) def eval (self, value ): return self.model.predict(np.array(value).reshape(1 , -1 )).tolist()[0 ] demo_udf = udf(DemoUDF(), result_type=DataTypes.INT())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 SET 'python.files' = './qflink-python/src/main/python/q-pyflink/udf' ;SET 'pipeline.operator-chaining' = 'false' ;SET 'python.execution-mode' = 'thread' ;CREATE TEMPORARY FUNCTION IF NOT EXISTS demo_udf AS 'scalar.demo_udf.demo_udf' LANGUAGE PYTHON;CREATE TABLE IF NOT EXISTS `source` ( `sepal_length` FLOAT , `sepal_width` FLOAT , `petal_length` FLOAT , `petal_width` FLOAT ) WITH ( 'connector' = 'datagen' , 'rows-per-second' = '1' );SELECT demo_udf(ARRAY [`sepal_length`, `sepal_width`, `petal_length`, `petal_width`]) AS `classifier `FROM `source`;
注: 需提前创建Python虚拟环境,安装pyflink依赖且激活;若Python算子是CPU密集型任务,则需调整TM CPU个数
Summary 大数据技术发展日新月异,但_SQL NEVER DIE_ !本文讨论了如何利用UDF来来提升SQL的表达能力,希望可以提升你对SQL Is ALL Your Need
的信心。但技术没有银弹,SQL不可能解决所有的问题。不管是SQL还是底层API,其实都是解决问题的方式,需要根据具体问题采用合适的工具,甚至可以结合SQL的通用性以及底层API的灵活性 来优雅地解决问题~
本文所有SQL代码可参考:SQL IS ALL Your Need: Flink SQL UDF