Flink CEP在实时风控场景的落地与优化
导读 随着用户对数据安全和网络安全的日益重视,实时风控技术的提升成为了当务之急。阿里云开源大数据团队通过改进 Flink CEP 的实现,支持了在 Flink 作业中动态加载规则,并且增强了 Flink CEP SQL 语法,充分发挥了 Flink 引擎的性能,在实时风控场景有不俗表现。
本次分享主要包括以下四大部分:
-
Flink CEP 的概念,以及一些典型的应用场景;
-
阿里云开源大数据团队内部对 Flink CEP 动态加载多规则特性的设计与实现;
-
阿里云开源大数据团队内部对 Flink CEP SQL 语法的增强,以及在性能和稳定性方面的优化;
-
实际客户支持中遇到的典型风控场景和常见实现技巧。
分享嘉宾|耿飙 阿里云 开发工程师
编辑整理|陈沛欣
内容校对|李瑶
出品社区|DataFun
01Flink CEP 介绍
Flink CEP 是复杂事件处理(Complex Event Processing)的缩写。它是基于Flink实现的复杂事件处理库。它的核心功能是识别输入数据流中符合特定模式,即Pattern的事件序列,并允许用户针对这些序列进行针对性处理。
1. 什么是Flink CEP
这里是一个简单的例子,可以让大家对 Flink CEP 做了什么事情有一个基础了解。
①. 模式。
首先,假设我们对模式ABBC感兴趣。它代表的实际含义可能是A类事件发生后,连续发生了两次B类事件,最后发生了一次C类事件。我们不要求这些事件是严格连续的,中间可以插入一些无关事件。
②. 事件流。
我们针对这种模式使用Flink CEP的API编写了相关代码。当Flink CEP作业启动后,遇到实际输入事件流,如图中的事件流,例如d1、a1、b1、b2、d2、c1、e1、a2,针对这样的事件数据流,Flink尝试识别定义的匹配ABBC,最终得到匹配结果。例如,这里的匹配结果是a1、b1、b2、c1。
③. 匹配。
识别出这样的匹配后,用户就可以在作业中定义如何针对这些匹配进行处理。常见的做法是将报警输入到下游系统或其他地方,然后进行进一步处理。
2. Flink CEP的应用场景
了解了Flink CEP的具体用途后,再来看一下Flink CEP通常用于解决哪些问题。在实际场景中,Flink CEP得益于Flink的分布式特性以及毫秒级处理延迟和丰富的规则表达能力,得到了较多应用。
这里举三个典型的例子。
①. 实时风控。
Flink CEP 可应用于风险用户检测,例如读取并分析用户行为日志,将 5 分钟内转账次数超过 10 次且金额大于 10000 的客户识别为异常用户,并进行针对性的风险提示。
②. 实时营销。
我们可以利用 Flink CEP 来优化营销场景中的策略。比如检测用户行为日志,从而在电商大促时,找到"10 分钟内,在购物车中添加超过 3 次的商品,但最终没有付款"的用户,针对性的调整营销策略。类似的逻辑也可以应用在实时营销的反作弊场景中,这已经在钱大妈以及阿里内部有一些具体的落地案例。
③. 物联网。
Flink CEP 可以用于检测异常状态并发出告警,比如共享单车被骑出指定区域,且 15 分钟内没有回到指定区域时发出风险提示。如果和物联网传感器结合,还可以用于检测工业生产中的流水线异常。比如检测到三个时间周期内,温度传感器都反馈温度超过设置阈值,就发布报警等等。
02动态多规则支持
接下来我们介绍阿里云实时计算团队基于Flink社区的FLIP-200所做的动态多规则支持。
1. 动态规则支持的背景
在介绍我们为什么需要动态规则更新前,先看一下右边这个图,确定规则究竟包含哪些要素。我们认为 Flink CEP 中的规则,或者我们刚才提到的 Pattern,由三部分组成,即阈值、条件和事实。
下面我们以"五分钟内通过广告链接访问某商品超过五次,但最终没有购买"为例来介绍这三个要素。
- 阈值:例如5次就是我们定义的阈值,可以根据需要改成7次或者10次等。
- 事实:是规则针对的动作,例如通过广告链接访问某商品以及购买等。
- 条件:用于描述我们如何根据阈值和事实去过滤我们想要的事件或动作。
在明确了这三个组成要素之后,我们可以理解为什么需要支持动态规则更新。频繁变化的现实场景要求我们对初始规则内容进行调整或添加新的规则。
比如有一个 CEP 作业会在某个用户在一分钟内连续进行某操作超过 10 次后将其认为是风险用户。
在特殊场景,如流量暴增或举办某些活动时,预期访问次数会比平常多一些。10次的阈值就不太合适,我们可能想改成20或者30。在当前的开源 Flink 实现下,如果想实现这一步,只能重新编写 Java 代码,然后重启作业,以使最新的规则生效。
这种做法的问题很明显:
- 规则生效的时间成本较高。因为要走完整的代码开发和打包上线等一系列流程。而风控领域的作业通常对延迟比较敏感。对于这些对延迟敏感的作业来说,上述时间成本难以接受。
- 如果规则的时间窗口较长且状态较大,重启作业的代价会更高。
因此,我们需要支持动态规则更新,也就是所谓的不重启Flink作业来更新作业中实际应用的规则。
为了实现这一点,我们列出了两个关键问题。
- 如何让 Flink 作业不停机地加载新规则。
- 如何解决规则(Pattern)的序列化和反序列化。这是从第一个问题衍生而来。如果想让作业不停机加载,作业就必须从某个地方拿到动态拿到新的 Pattern,并生成对应的 Pattern 对象在作业中使用。
在其他公司的生产实践中,我们也看到了针对上述两个问题,大家提出了一些自己的解决方案。例如修改 Flink CEP 内部实现,即在 Flink CEP Operator 上添加注入规则的接口,使 Flink CEP Operator 在作业运行中可以不停机地加载新规则,以及基于 groovy 引擎动态生成 Pattern 对象,从而解决序列化和反序列化问题。
然而,我们也注意到,这样的实现方案存在一定的缺点。
- 数据库压力增大:通常情况下,规则都存储在数据库中。对 Flink CEP Operator进行修改时,会让 Flink CEP Operator 直接与数据库交互,拉取最新规则。这样一来,当 Flink CEP 的作业并发较多时,对于大作业中的每一个 Flink CEP 并发,都需要连接数据库读取规则,这会给数据库带来额外压力。
- 拉取规则同一性无法严格保证:可能 Subtask1 取得了规则一的某一个版本,而Subtask2 由于网络问题或其他各种问题,拉取到了规则一的其他版本。这会导致不同并发之间使用了不同的规则,最终导致整个 Flink CEP 作业在逻辑上的不一致。
- 不利于拓展多规则支持:在修改 Flink CEP 并添加注入规则接口时,通常只支持修改单条规则,这并不利于拓展到对多规则的支持。
使用 Groovy 引擎动态生成 Pattern 对象也存在一些有待提高之处。
- 表达能力有限:通常只能配合 Aviator 表达式动态修改阈值,但较难修改规则整体逻辑。
- Groovy 脚本的编辑需要较多编程知识:对于风控策略人员或运营人员来说,他们可能对 Groovy 脚本的语法不太了解,这会产生额外的学习成本。
- 生成规则耗时较长:Groovy 是一个较重量级的引擎,生成规则的耗时相对较长。
2.FLIP-200的提出
在考虑以上背景和问题后,我们在Flink社区提出了 FLIP-200提案,并在阿里按照FLIP-200提案实现了一版 Flink CEP 中动态规则的支持。接下来将详细介绍我们是如何实现的,以及如何解决之前提到的一些问题。
首先我们新增了 PatternProcessor 接口,用于完整定义Flink CEP中的一条规则。
PatternProcessor 包含 getId,getVersion 等用于获取该 Pattern 唯一标识符的方法;getTimestamp 等用于获得时间戳,进行调度的方法;getPattern 对象用于拿到 PatternProcessor 所内嵌的规则;PatternProcessorFunction 用于描述如何处理找到的匹配。除此之外,为了功能的完整性,我们还添加了
PatternProcessorDiscoverer 和 PatternProcessorManager,用于描述如何发现和管理 Processor。
下面介绍一下在动态规则支持中的具体设计。
3.动态规则支持中的具体设计
首先,介绍一下 Flink 的 Operator Coordinator 机制。Operator Coordinator 顾名思义,它负责协调 Flink 作业中的各个 operator。Operator Coordinator 本身运行在 Job Manager 中,它可以向每个下游 Task Manager 中的每个 Operator 发送一些事件。
它之前主要在 Flink 的 Source 和 Sink 中应用,用于发现和分配读写的 workload,以确保不会出现过于严重的数据倾斜等问题。我们也复用了这一机制实现了 Dynamic CEP Operator Coordinator。它是一个在 JobManager 中运行的线程,它会调用我们刚才提到的 Pattern Processor Discover 接口从数据库拿到序列化后的最新Pattern(如图绿色圆圈里的P)。拿到之后,它会发送给下游与之关联的 Dynamic CEP Operator。这些 Dynamic CEP Operator 会接受 Operator Coordinator发送的事件,并解析和反序列化成实际使用的 Pattern Processor,然后构造对应的 NFA(非确定有限状态机)。之后即可使用新构造的NFA来处理上游发生的事件,并最终输出到下游。另外,我们允许一个 CEP Operator 包含多个 NFA,这样可以比较好的支持多规则。
基于这样的设计,我们可以实现不停机更新规则内容,且仅有Operator Coordinator 会与外部规则数据库进行交互,减少了对数据库的访问,并且由于Flink框架保证了 Operator 在处理来自 Operator Coordinator 的事件的一致性,我们也保证了各个 Subtask 所使用的规则的一致性。
4.动态规则支持中 Pattern 的序列化和反序列化
接下来为大家介绍动态规则支持:Pattern的序列化和反序列化。
Pattern 本质上是描述了规则匹配时使用的NFA(非确定状态自动机)的状态转换图, 即根据输入事件如何从一个状态转移到另一个状态。
有了这样一个基础观察后,我们可以了解到NFA对应一个状态转换图。我们可以稍作简化,例如将一个复合 Pattern 规则定义为一个图。在这个图中,每个节点是一个子Pattern,而"边"则是事件选择策略,也就是说,我们如何从子 Pattern1 的匹配中跳转到 Pattern2 的匹配。我们也可以将每个图看作一个更大的图的子节点,从而实现模式的嵌套。也就是说,某个模式的子 Pattern 本身也可以是一个完整的复合Pattern 。
那么我们该如何描述这个图呢?在设计过程中,我们有一些基础的想法或设计原则。
- 应具备完整的表达能力。
- 序列化和反序列化相对方便。
- 易于拓展,方便集成。
- 格式应该能读取、编辑。
基于这些原则,我们最终选择基于 JSON 来定义一套描述 Pattern 的规范。现在我们给出一个简单的例子来展示我们所定义的这套 JSON 规范。
首先,我们可以用 Java API 大致定义这样一个示例Pattern:当满足 StartCondition 的事件出现大于等于三次之后,如果跟着一个满足 EndtCondition 的事件,那么我们就认为这是一个匹配。我们看到这个 Pattern 有两个子 Pattern,第一个是 StartPattern,第二个是 EndPattern。他们定义的一个图可能类似于下面这样一个状态转换图。
这里P1对应着 Start Pattern,而P2对应着End Pattern。在这张状态转换图中,有两条边,第一条是P1指向自己的边,代表满足P1规则的事件可以重复出现。另外也有一条从 StartCondition 指向 EndCondition 的边。
右图则是这个状态转换图的 JSON 描述。第一个是 node 字段,它是一个数组,包含每个子 Pattern 的完整描述,比如这里我们用 times 字段表示这个子 Pattern 对应的 Condition,要被满足大于等于三次。第二个是 edges 字段,它用于记录边的信息。
关于各个字段的完整定义、取值以及物理描述,可以参考阿里云官方官网文档。
用户可以通过这一套规范来使用 JSON 语法描述一个完整的 Pattern,从而实现 Pattern 的序列化和反序列化。在数据库中,实际存储的可能是一个对应的 JSON 字符串,它可以反序列化成一个对应的 Pattern Processor 对象。
6.动态规则支持:拓展Condition
在描述完刚才提到的序列化和反序列化之后,我们将继续介绍如何将这些功能应用于动态修改 Condition 中的阈值。
①. Aviator Condition
我们基于 Aviator 表达式引擎定义了 AviatorCondition。它的构造函数会接受一个表达式字符串,并根据输入的表达式字符串生成 AviatorExpression,然后在 filter 方法中通过反射来解析传入的事件字段和阈值,执行 AviatorExpression,最后返回 true or false 作为 filter 这个方法的返回结果,用于判断是否满足 Aviator Condition。
我们这里举一个例子,假设有一个叫 Event 的类,它有 price 和 action两个字段。那么我们就可以构造一个这样的 AviatorCondition,其参数是一个表达式字符串,这个字符串里描述了对 Event 中事件字段的取值要求。比如我们要求 action==1&&price>20。如果我们想要更新阈值,就直接修改表达式,变成 action==0&&price>50。
注意这个字符串是传入的参数,它也可以在我们刚才介绍的 JSON 格式中定义和描述,所以我们也可以直接编辑数据库中的字段进行阈值的动态更新。
②. Groovy Condition
除了 Aviator 之外,为了满足不同用户的使用习惯,我们也支持了 Groovy 语法,允许将 Groovy 表达式作为参数生成对应的 Condition。
③. Custom Args Condition
除此之外,为了更进一步增强表达能力,支持一些高度定制化的需求,我们还实现了 Custom Args Condition,它支持用户传入自定义参数,并允许用户自定义参数的解析逻辑。例如我们可能想自定义解析逻辑支持更复杂的处理需求。我们之前提到的两个 Condition(Aviator Condition和Groovy Condition)可以看作是 Custom Args Condition 的一个特殊实现。
7.动态规则支持------多规则支持
接下来介绍多规则的支持。
多规则的关键点在于如何在同一输入数据流上使用多条规则。按照开源 Flink CEP 的方案,我们要在一个 Flink 作业中使用多条规则的话,需要定义多个 Pattern Stream(CEP API 提供的接口),对应生成多个 CEP Operator。因此上游 input Source 会向不同的 Flink CEP Operator 发送数据,这也代表着上游的数据需要多次网络传递,从而带来一些额外的开销。
我们这里进行了一个优化,允许一个 Dynamic Flink CEP Operator 创建多个 NFA,这样上游数据只需传递一次,避免了额外的拷贝和网络数据传输,降低资源消耗。
8.动态规则支持:Demo
我们在阿里云官网上也提供了一个针对广告投放中的实时反作弊场景实现的 demo,用于演示使用。相应的代码也在阿里云上开源。如果大家有兴趣,可以去这里的网址查看。
++https://help.aliyun.com/document_detail/459880.html++
++https://github.com/RealtimeCompute/ververica-cep-demo++
03CEP SQL语法增强&性能优化
接下来介绍一下我们在 CEP SQL 方面所做的一些工作。
1.Flink CEP SQL:介绍
Flink CEP SQL 的核心是 MATCH_RECOGNIZE 语法,这是提到的 SQL2016 中定义的一套规范。它主要包含以下几个部分:
①. PARTITION BY:用于定义逻辑,例如这里是 PARTITION BY user_name,它相当于 group by user_name 或者 Java API 中的 key_by。
②. ORDER_BY:用来定义输入数据的排序方式。因为 CEP 需要在持续数据流中识别特定模式,所以它必须按时间顺序进行排列。通常这里的字段是 row_time。
③. MEASURES:类似于普通SQL中的select操作,用于对识别出的序列进行映射聚合等操作,并定义最终输出结果。在图中,我们使用了 FIRST 、LAST 和 COUNT 这些内置函数,对循环模式匹配到的A序列进行了聚合计算。对于B序列,我们只做了一个映射操作。
④. PATTERN:它是 MATCH_RECOGNIZE 语法的核心,也就是我们所说的Pattern 定义。它类似于正则表达式的语法。例如 A+B 模式表示序列中允许先出现一个或连续多个 A 事件,再紧接着出现一个 B 事件。
⑤. DEFINE:它用于设定了A和B两个模式变量对应的事件匹配条件。例如,A 对应的匹配条件是"event_type为A"。
2.Flink CEP SQL:示例
这里是一个 CEP SQL 的例子。假如我们有一个 Source 表,里面7条数据,username 取值中有 Alice、Bob 等。如果对于这个表来应用我们刚才上一个 PPT 中的A+B模式去做匹配,那么我们得到图中这样的结果表。
现在针对Alice进行具体说明:Alice用户的事件序列包含4个对应事件,由于我们定义了 A+B 的 Pattern 以及 Flink CEP SQL 中使用的默认 After Match Strategy是 SKIP_TO _NEXT,所以实际输出了三行。第一行对应序列 AAAB,第二行对应序列是 AAB,第三行对应序列是 AB。
3.Flink CEP SQL:语法增强
刚才是基础的 Flink CEP SQL 语法,接下来介绍在阿里内部针对 CEP SQL 语法所做的一些增强。
①. 输出带时间约束模式的匹配超时序列
在用户行为模式识别的场景中,如果我们希望找到在10分钟内完成一系列动作的高质量用户,我们可以通过 WITHIN INTERVAL 语法来指定这个时间约束。
对于 Alice 用户,它在10分钟内完成了指定的三个动作,这满足了我们所说的10分钟内的时间约束,因此会有一条输出。对于 Bob 来说,虽然也满足了 ABC 的模式,但因为满足条件A的事件是在8:02完成的,而满足条件 C 的事件是在8:15完成的,相隔时间已经超过了10分钟,所以它没有对应输出。
我们刚才提到的典型场景是不输出 Bob 的场景。但有时候,我们可能想找到流程中断的原因,例如为什么 Bob 在10分钟内只做 A、B 两件事而没有做 C,即需要输出超时的事件匹配序列(在时间限制内未完全匹配的事件序列)。
为此,我们支持了定义 ONE ROW PER MATCH SHOW TIMEOUT MATCHES 语法,它的含义是允许展示匹配超时的事件,而不仅仅是展示匹配成功的事件。在正常的 Java API 中,这些超时序列是通过 Java API 的侧输出流,即所谓 SideOutput输出到另一个 datastream 来实现的。
在 SQL 中,我们无法使用侧输出流,因此会输出在结果表中。但如果 C 动作没有发生,也就是 Bob 在10分钟内没有完成这件事情,那么 action_c_time 列会出现 Null 值。这样便于用户在后续针对性地过滤出 Bob 这类特殊用户。
②. 定义事件之间的连续性
Flink CEP 支持不同种类的事件连续性。例如,A.next(B) 要求事件 A 和事件 B 之间必须连续出现,中间不能有任何其他类事件,如 C 类事件等。这类连续性我们称之为严格连续。A.followedBy(B) 则允许 A 和 B 之间可以出现其他事件。这类连续性我们称之为松散连续。而开源 CEP 只支持严格连续。我们通过添加{- x*?-}语法来支持使用松散连续。
其原理是 X 未在 DEFINE 当中定义,那么它代表任意匹配。也就是说,我们允许 A和 B 之间出现任意一个无关事件,对应我们所说的松散连续。
③. 定义循环模式中的连续性和贪婪性
除了刚才的连续性增强外,我们还支持定义循环模式中的连续性和贪婪性。具体来说,循环模式中,例如这里的 A+,是一个单独的循环模式。我们允许 A 类事件出现一次或多次,默认要求是严格连续的 A 类事件,即不能有任何其他类型的事件。同时该模式会贪婪地匹配,尽可能地匹配最长的 A 序列。
我们这里实现了2个增强:
- 支持非贪婪的语义。即尝试尽可能匹配短序列,而非长序列。
- 允许模式内部的松散连续。
具体语法上,一个问号(例如 A+?)对应非贪婪,两个问号(例如 A+??)对应的是松散连续且贪婪,而三个问号对(例如 A+???)应的松散连续且非贪婪。
④. 循环模式指定的停止条件(Until)
这对应 Java 的 until 语法。表示允许在循环模式中,例如 A+{B}C 模式在匹配 A 类事件时,如果遇到 B 类事件,那么会立刻终止 A 类循环模式的匹配,进入到下一部分 C类事件匹配中。
⑤. 组合模式(Group Pattern)
最后一部分增强是组合模式。我们可以将多个模式组合成一个整体用在 next()、followedBy()这些函数中,支持整体的嵌套循环。它对应着括号语法。
⑥. AFTER MATCH NO SKIP 策略
最后一个 CEP SQL 的增强则是对 AFTER MATCH NO SKIP 策略的支持。
Flink CEP SQL 中默认的策略是 SKIP_TO_NEXT_ROW,它会丢弃以相同事件开始的所有部分匹配。实际 java API 中默认的是 NO_SKIP 策略,它会把每个成功匹配都输出出来。
这里有一个例子,我们可以比较这两个模式的不同。例如对a b+模式来说,如果输入的是a1、b1、b2、b3,如果是NO_SKIP策略,它会输出三条匹配结果,即a1b1、a1b2、a1b2b3。而对于SKIP_TO_NEXT来说,它只会出a1b1。原因是我们在找到a1b1匹配之后,所有以a1开头的匹配都会被丢弃,就是所谓的SKIP_TO_NEXT_ROW 。但 NO_SKIP 是更常见的使用策略,所以我们也拓展了对它的支持。
4. FLink CEP 性能优化
这部分简要介绍一下我们在内部对 Flink CEP 进行的性能优化。
①. 减少State访问
通过增加 Cache、优化 OnEvent/ProcessingTime()实现,减少了大约30%的state 访问,从而使大规模 Flink CEP 作业对 CPU 的消耗更少。
②. 修复 State 泄露
这是针对社区的一个开源bug的修复。主要针对生命周期较短的 key,如果这些 key的相关状态没有及时清理,可能会导致 state 不断增大。当 key 包含一些随机字段,例如 timestamp 或随机 ID 时,该问题非常容易出现。
③. 小贴士
如果大家想使用 Flink CEP,尽量使用 ver1.16 以上的版本。在这些较新的版本中,社区有一个关键优化,可以减少 Timer 注册,从而极大幅度地减少作业的 CPU 消耗。
04风控场景实际案例
最后,介绍一下我们在支持客户中遇到的风控场景的典型应用。
1.业务类场景应用
①. 交易风控
例如客户在电商交易或银行交易中,可能会想检测一些特殊用户。如一段时间内某个IP退款次数超过一定金额,就触发熔断,禁止该IP进行额外交易。交易风控是我们在支持客户中遇到的最多的应用场景。
②. 内容风控
例如用户在x分钟内发布超过y条帖子,对账号进行禁言或其他处理等。
③. 物联网风控
例如,检测设备异常,如果某个设备连续发生超过10次以上的异常,且超过15分钟内没有恢复,则发出报警消息等。
④. 网安风控
检测到某台电脑上的日志,例如用户行为满足点击钓鱼邮件、下载异常文件、执行隐藏代码等条件后触发报警。
2.新功能应用
初次接触 FlinkCEP 的用户可能对 Flink CEP 中的部分功能或用法不太熟悉。我们提供了2个比较常用的功能的使用示例,分别是用于获取子 Pattern 之前匹配的事件
context.getEventsForPattern() 接口,以及 Flink1.16 引入的用于定义相邻事件之间的时间间隔的新语法:WithinType.Previous_AND_CURRENT 。如果有类似需求的客户也可以参考。
以上就是本次分享的内容,谢谢大家。