工作后做了一段时间的SQL BOY,但从20年底开始不满足于现状,正好业务也有越来越多的实时需求,所以开始学习实时计算。从Java写起到回归SQL再到结合Python的探索,算是对Flink有了一定程度上的了解,本文将回顾个人学习经历,供自省与借鉴~
从一个监控开始 当时业务面临的一个监控的场景:游戏中一个玩家每天获得的奖励是有限的,需要监控玩家获得物品情况,达到阈值时预警及时地进行处理。在2020年底的时候Flink已经在实时计算领域成为业界的事实标准,所以选择Flink进行实时处理也就成为了自然而然的事。由于当时我们的实时计算平台主要用Java代码编写应用,所以第一个Flink程序采用底层API实现的。
底层API:ProcessFunction 使用底层API编写Flink程序主要的难点是状态的处理。因为实时应用是长期运行的,如果不及时处理过期状态,最终会由于OOM导致程序崩溃。这里我们采用MapState 模拟窗口状态,并注册处理时间定时器按照清理过期状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class MyProcessFunction extends KeyedProcessFunction <String, Tuple3<String, Long, Integer>, String> { private static final int THRESHOLD = 100 ; private static final long CLEAR_INTERVAL = 28 * 60 * 60 * 1000L ; private transient MapState<Long, Integer> aggState; @Override public void open (Configuration config) throws Exception { MapStateDescriptor<Long, Integer> expAggDescriptor = new MapStateDescriptor <>("Aggregation" , Long.class, Integer.class); aggState = getRuntimeContext().getMapState(expAggDescriptor); } @Override public void processElement (Tuple3<String, Long, Integer> value, KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>.Context ctx, Collector<String> out) throws Exception { Integer agg = value.f2; Long windowStart = getWindowStartWithOffset(value.f1); if (aggState.contains(windowStart)) { agg += aggState.get(windowStart); } aggState.put(value.f1, agg); if (agg > THRESHOLD) { out.collect("Alert Happens!" ); } ctx.timerService().registerProcessingTimeTimer(windowStart + CLEAR_INTERVAL); } @Override public void onTimer (long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { aggState.remove(timestamp - CLEAR_INTERVAL); } private long getWindowStartWithOffset (long timestamp) { long offset = 8 * 60 * 60 * 1000L ; long windowSize = 24 * 60 * 60 * 1000L ; long remainder = (timestamp - offset) % windowSize; if (remainder < 0 ) { return timestamp - (remainder + windowSize); } else { return timestamp - remainder; } } }
其实由于一开始不熟悉Java以及Flink相关API,导致了很多问题,比如没有正确清理状态,debug排查问题难、代码组织混乱等。这个故事告诉我们一开始不熟悉API时,应该从上层API开始应用,屏蔽底层架构的复杂性。
当然,随着经验的增多,我也再不断优化代码结构,提升开发效率:
执行环境和作业参数的模块化:默认参照最佳实践设置Kafka的动态分区:因为生产环境经常需要对数据源kafka进行扩缩容,所以一定要设置动态分区发现,避免丢失数据。 连接器可插拔化:自定义连接器,使得不同环境只需采用不同参数控制 中层API:Window 为了进一步提升效率,我会去想一想有没有更好的实现,可以不用自己管理状态。这时通过翻阅相关文档发现可以使用Window API,只需自定义Trigger就可以优雅地实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 DataStream<Tuple3<String, Long, Integer>> dataStream = ...; dataStream.keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.days(1 ), Time.hours(-8 ))) .trigger(new Trigger <Tuple3<String, Long, Integer>, Window> () { private static final int THRESHOLD = 100 ; private final ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor <>("agg" , Integer.class); @Override public TriggerResult onElement (Tuple3<String, Long, Integer> element, long timestamp, Window window, TriggerContext ctx) throws Exception { Integer agg = element.f2; ValueState<Integer> aggState = ctx.getPartitionedState(stateDesc); if (aggState.value() != null ) { agg += aggState.value(); } aggState.update(agg); if (agg >= THRESHOLD) { return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime (long time, Window window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime (long time, Window window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear (Window window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } }) .reduce((value1, value2) -> value2, new ProcessWindowFunction <Tuple3<String, Long, Integer>, String, String, TimeWindow>() { @Override public void process (String key, ProcessWindowFunction<Tuple3<String, Long, Integer>, String, String, TimeWindow>.Context context, Iterable<Tuple3<String, Long, Integer>> elements, Collector<String> out) throws Exception { out.collect("Alert Happens" ); } });
窗口算子会在窗口时间结束时自动清理窗口状态,所以不用我们操心状态。但窗口默认是在窗口结束时触发计算,所以我们需要自定义触发器,使得在达到阈值条件时马上触发。
这里需要注意的一个问题是:窗口聚合时使用ProcessWindowFunction 会保留窗口内全量元素导致状态过大,所以这里我们利用ReduceFunction 实现增量聚合,节约状态空间。
上层API:SQL 虽然刚开始时没有想过Flink SQL的实现,但为了内容的完整性,也再次补充相应实现,其他内容见后续:
1 2 3 4 5 6 7 8 9 10 11 12 SET table.exec.state.ttl = 2 d;SELECT DATE_FORMAT(`time `, 'yyyyMMdd' ) AS `window_start`, `key_word` AS `key`, MAX (`time `) AS `time `, SUM (`key1`) AS `metric`FROM `dwd_log`GROUP BY DATE_FORMAT(`time `, 'yyyyMMdd' ), `key_word`HAVING SUM (`key1`) > 100 ;
源码实现:Operator 做需求的同时需要有技术沉淀,当接手了几个类似的需求时,就可以慢慢去总结思考:有没有可能不用每次需求来的时候去开发代码,可不可以只需要配置相应规则,就可以马上生效。
具体的实现可以参考之前的文章——Flink规则引擎
利用广播流实现动态规则下放 仿造WindowOperator
原理实现自定义窗口配置 但DataStream API中窗口采用复制的实现,细粒度的滑动窗口会导致状态和定时器的膨胀,会有性能问题,而Flink SQL采用WindowSlice
实现,将滑动窗口转化为滚动窗口,尽可能地复用中间结果,降低状态压力。详情可见Flink SQL窗口表值函数(Window TVF)聚合实现原理浅析 ,后续可以借鉴相应思想继续优化~
Flink SQL 当大家都习惯了用一项技术进行开发时,你需要说服他人使用新的技术,就必须充分论证新的技术优势性。而Flink SQL这项技术的引入就是因为在累计在线人数这个问题上优势得到了体现。
技术引入 最开始我们采用了BloomFilter在底层API维护相应状态实现,但当数据规模大幅增加后,产生了数据倾斜问题,大量数据在一个节点计算,作业经常出问题。这时候刚好Flink 1.13发布,增加了累计窗口的功能,且只需通过设置拆分 distinct 聚合参数即可解决数据倾斜问题,正是引入Flink SQL的时机。此时我们的实时计算平台对Flink SQL的支持有限,但正是之前利用底层API开发应用的经验,让我可以利用Table API包装一层,达到类似只需写SQL的效果~
1 2 3 4 5 6 7 8 9 10 SET 'table.optimizer.distinct-agg.split.enabled' = 'true' ;SELECT DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:ss' ) AS `window_start`, DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:ss' ) AS `window_end`, `key_word` AS `dim`, COUNT (DISTINCT `key2`) AS `metric`FROM TABLE (CUMULATE(TABLE `dwd_log`, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES, INTERVAL '1' DAYS))GROUP BY `key_word`, `window_start`, `window_end`;
随着业务对实时数据稳定性要求越来越高,我们也做了一些工作进行运维保障:
由于当时平台还未支持动态扩缩容,所以我们通过详细地压测得出合适的作业参数设置。 配置作业状态,消费延迟等监控,第一时间处理作业异常问题。 迁移专有集群,保障作业稳定性。 同时为了进一步地推广使用Flink SQL,必须简化上手复杂性,并增强功能。这时我就去研究开发了Flink SQL SDK。
Flink SQL SDK 首先为了提升Flink SQL的开发效率,实现了基础Java代码和SQL代码的解耦, 统一管理SQL代码,开发者虽然仍需克隆相应Java代码,但只需编写SQL就可以直接运行,无需编写原生代码。
其次为了提升Flink SQL的功能,自定义ClassLoder
动态加载UDF和Connector等相应Jar包,可以在不改动基础代码的前提下,增强SQL的表达能力,实现可参考FlinkSQL 动态加载 UDF 实现思路 。
最后为了推广Flink SQL的使用,利用UDTF和自定义Kafka多Topic Sink实现之前原生作业从全量数据中解析相应不同关键字到不同Topic的功能,同时也避免了Flink SQL UDF重复调用的BUG 。
虽然随着实时计算平台的不断发展,对SQL的支持在不断增强,可以直接通过Flink SQL网关进行SQL提交,可能Flink SQL SDK就没什么作用了,又逐渐退化成了SQL Boy。但这次通过相对底层地去赋予SQL更多能力,希望在实现SQL Is All Your Need 的路途上也留下自己的足迹。
PyFlink 随着数据分析和AI的发展,Python越来越流行,故Flink与Python生态的结合就会带来更多的可能性。在游戏反外挂的场景中,我们需要根据不同策略处理数据特征,实时筛选出开挂玩家进行处罚,而这里的策略一般会是安全人员采用的Python脚本编写,同时还会有复杂的算法策略,这正好就是PyFlink发挥的舞台~
技术引入 其实最开始我们是利用Jython让Python运行在JVM中实现,但Jython缺乏维护且兼容性太差,有时甚至需要手动将Python代码翻译成Java代码,难以应对复杂的需求。但随着实时平台支持PyFlink后,只需简单调用用户脚本即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class DemoUDF (ScalarFunction ): def open (self, function_context: FunctionContext ): model_path = "your model path" script_path = "your python script path" with open (script_path, "r" ) as f: code = f.read() exec (code, globals ()) self.model = user_open(model_path) def eval (self, value ): return user_eval(self.model, value)
PyFlink有两种模式:进程模式和线程模式。进程模式是靠Python进程和Java进程中进行通信,会有更好的资源隔离性,但会带来数据序列化上的性能损耗。而线程模式采用外部函数接口(FFI),避免数据的序列化,带来更好的性能。但在实践中也遇到了一些问题:
算子链:在开发原生Flink应用时,是尽可能将不同算子链在一起,避免数据在不同算子间的序列化性能损耗,但涉及到Python算子时,需要避免算子链,让Python算子获得充分资源。 CPU密集:和原生Flink应用大多是内存密集型的不同,Python算子很可能会是CPU密集的,这是就需要调整CPU资源才能提升运行效率。 实现方式:利用PyFlink + DataStream API 和 Flink SQL + Python UDF在相同资源下运行相同作业,发现Flink SQL + Python UDF运行效率会差很多,这个有可能是平台资源控制的问题,可惜这个问题暂时还没有答案。。采用Flink SQL + Python UDF可以复用代码,而改用PyFlink提交作业,需要将基础的Java代码转化为Python代码,会比较麻烦,这是我发现这个问题的原因。 流批一体 随着大数据的发展,涌现了各种各样的组件,比如存储有文件系统HDFS、消息队列Kafka, 计算离线用Spark、实时用Flink, 导致开发和维护成本都比较高。对开发而言,由于离线和实时采用不同的组件开发,最麻烦就是离线和实时的一致性保证。比如在安全业务中,需要离线统计算法策略命中人员的概率分布时,需要将实时代码全改为PySpark代码,费时费力。
但计算流批一体是Flink的一个特性,只需改动相应Connector,就可以离线运行,大大提升开发效率,而且简单对比了下,在同样资源下Flink效率甚至比Spark高。同时数据湖的概念也正在兴起,如果再利用数据湖实现存储流批一体,那未来对普通开发人员而言,应该不会再区分实时和离线,只需要用一份相同的SQL代码,就能完成各种数据开发工作,那时真正的挑战将是对业务的理解。
总结 从写离线SQL进入大数据领域, 到不满足于SQL Boy, 开始接触Flink,从底层API写起,最终又回到了SQL。也许之后大数据系统会回归到数据密集型的分布式系统,统一用SQL甚至自然语言作为开发,但希望在这一旅途中留下自己的足迹~