前言
在上一篇博客中,我们对欺诈检测引擎的目标和所需要的功能进行了描述,我们还描述了如何基于可修改的规则而不是使用硬编码的 KeysExtractor 实现 Flink 应用程序的数据自定义分区。
我们在上篇博客中特意省略了有关如何初始化应用的规则以及如何在作业运行时更新的细节,在本文我们将详细的介绍这些细节,你将学习如何将上篇博客中描述的数据分区防御与动态配置结合使用,当这两种模式结合使用的时候,可以省去重新编译代码和重新部署 Flink 作业的需要,从而可以应对多种业务场景逻辑修改的情况。
广播规则
首先让我们看看预定义的数据处理代码:
DataStream<Alert> alerts =
transactions
.process(new DynamicKeyFunction())
.keyBy((keyed) -> keyed.getKey());
.process(new DynamicAlertFunction())
DynamicKeyFunction 函数提供动态数据分区,同时 DynamicAlertFunction 函数负责执行处理数据的主要逻辑并根据已定义的规则发动告警消息。
在上篇文中中简化了用例,并假定已预先初始化了所应用的规则集数据,并可以通过 DynamicKeyFunction 的 List<Rules>
访问这些规则:
public class DynamicKeyFunction
extends ProcessFunction<Transaction, Keyed<Transaction, String, Integer>> {
/* Simplified */
List<Rule> rules = /* Rules that are initialized somehow.*/;
...
}
显然,在初始化阶段,可以直接在 Flink Job 的代码内部添加规则到此列表(创建List
对象;使用它的add
方法)。这样做的主要缺点是,每次修改规则后都需要重新编译作业。在真实的欺诈检测系统中,规则会经常更改,因此从业务和运营需求的角度来看,使此方法不可接受,需要一种不同的方法。
上一篇文章介绍了使用 DynamicKeyFunction
提取数据含groupingKeyNames
里面字段组成数据分组 key 的方法。此规则第二部分中的参数由 DynamicAlertFunction
使用:它们定义了所执行操作的实际逻辑及其参数(例如告警触发限制)。这意味着相同的规则必须同时存在于DynamicKeyFunction
和DynamicAlertFunction
。
下图展示了我们正在构建的系统的最终工作图:
上图的主要模块是:
- Transaction Source:Flink 作业的 Source 端,它会并行的消费 Kafka 中的金融交易流数据
- **Dynamic Key Function:动态的提取数据分区的 key。随后的
keyBy
函数会将动态的 key 值进行 hash,并在后续运算符的所有并行实例之间相应地对数据进行分区。 - Dynamic Alert Function:累积窗口中的数据,并基于该窗口创建告警。
Apache Flink 内部的数据交换
上面的作业图还展示了运算符之间的各种数据交换模式,为了了解广播模式是如何工作的,我们先来看一下 Apache Flink 在分布式运行时存在哪些消息传播方法:
FORWARD:上图中 Transaction Source 后的 FORWARD 意味着每个 Transaction Source 的并行度实例消费到的数据都将精确的传输到后面的 DynamicKeyFunction 运算符的每个实例。它还表示两个连接的运算符处于相同的并行度,这种模式如下图所示:
REBALANCE:这种情况下一般是手动的调用 rebalance() 函数或者并行度发生改变导致的,这样会导致数据以循环的方式重新分区,有助于某些情况喜爱的数据倾斜。
广播状态
为了使用规则数据流,我们需要将其连接到主数据流:
// Streams setup
DataStream<Transaction> transactions = [...]
DataStream<Rule> rulesUpdateStream = [...]
broadcastStream<Rule> rulesstream = rulesUpdateStream.broadcast(RULES_STATE_DESCRIPTOR);
// Processing pipeline setup
DataStream<Alert> alerts =
transactions
.connect(rulesstream)
.process(new DynamicKeyFunction())
.keyBy((keyed) -> keyed.getKey())
.connect(rulesstream)
.process(new DynamicAlertFunction())
如您所见,可以通过调用broadcast
方法并指定状态描述符,从任何常规流中创建广播流。在处理主数据流的事件时需要存储和查找广播的数据,因此,Flink 始终根据此状态描述符自动创建相应的广播状态。这与你在使用其他的状态类型不一样,那些是需要在 open 方法里面对其进行初始化。另请注意,广播状态始终是 KV 格式(MapState
)。
public static final MapStateDescriptor<Integer, Rule> RULES_STATE_DESCRIPTOR =
new MapStateDescriptor<>("rules", Integer.class, Rule.class);
连接rulesstream
后会导致 ProcessFunction 的内部发生某些变化。上一篇文章以稍微简化的方式介绍了ProcessFunction
。但是DynamicKeyFunction
实际上是一个broadcastProcessFunction
。
public abstract class broadcastProcessFunction<IN1, IN2, OUT> {
public abstract void processElement(IN1 value,
ReadOnlyContext ctx,
Collector<OUT> out) throws Exception;
public abstract void processbroadcastElement(IN2 value,
Context ctx,
Collector<OUT> out) throws Exception;
}
不同的是,添加processbroadcastElement
了方法,该方法是用于处理到达的广播规则流。以下新版本的DynamicKeyFunction
函数允许在 processElement 方法里面中动态的修改数据分发的 key 列表:
public class DynamicKeyFunction
extends broadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {
@Override
public void processbroadcastElement(Rule rule,
Context ctx,
Collector<Keyed<Transaction, String, Integer>> out) {
broadcastState<Integer, Rule> broadcastState = ctx.getbroadcastState(RULES_STATE_DESCRIPTOR);
broadcastState.put(rule.getRuleId(), rule);
}
@Override
public void processElement(Transaction event,
ReadOnlyContext ctx,
Collector<Keyed<Transaction, String, Integer>> out){
ReadOnlybroadcastState<Integer, Rule> rulesstate =
ctx.getbroadcastState(RULES_STATE_DESCRIPTOR);
for (Map.Entry<Integer, Rule> entry : rulesstate.immutableEntries()) {
final Rule rule = entry.getValue();
out.collect(
new Keyed<>(
event, KeysExtractor.getKey(rule.getGroupingKeyNames(), event), rule.getRuleId()));
}
}
}
在上面的代码中,processElement()
接收金融交易数据,并在 processbroadcastElement()
接收规则更新数据。创建新规则时,将如上面广播流的那张图所示进行分配,并会保存在所有使用 processbroadcastState
运算符的并行实例中。我们使用规则的 ID 作为存储和引用单个规则的 key。我们遍历动态更新的广播状态中的数据,而不是遍历硬编码的 List<Rules>
。
在将规则存储在广播 MapState 中时,DynamicAlertFunction 遵循相同的逻辑。如第 1 部分中所述,通过processElement 方法输入的每条消息均应按照一个特定规则进行处理,并通过 DynamicKeyFunction 对其进行“预标记”并带有相应的ID。我们需要做的就是使用提供的 ID 从 broadcastState 中检索相应规则,并根据该规则所需的逻辑对其进行处理。在此阶段,我们还将消息添加到内部函数状态,以便在所需的数据时间窗口上执行计算。我们将在下一篇文章中考虑如何实现这一点。
总结
在本文,我们继续研究了使用 Apache Flink 构建的欺诈检测系统的用例。我们研究了在并行运算符实例之间分配数据的不同方式,最重要的是广播状态。我们演示了如何通过广播状态提供的功能来组合和增强动态分区。在运行时发送动态更新的功能是 Apache Flink 的强大功能,适用于多种其他使用场景,例如控制状态(清除/插入/修复),运行 A / B 实验或执行 ML 模型系数的更新。
原文地址:http://www.54tianzhisheng.cn/2021/01/23/Flink-Fraud-Detection-engine-2/
英文作者:alex_fedulov
英文原文地址:https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html