复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利用过滤、关联、聚合等技术,最终有简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中心发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。目前主流的CEP工具具有Esper,Jboss Drools和上夜班的MicroSoft StreamInsight等,Flink基于DataStream API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。

基础概念

FlinkCEP 说明

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。具备如下的特征:

  • 目标:从有序的简单事件流中发现一些高阶特征
  • 输入:一个或多个由简单事件构成的事件流
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  • 输出:满足规则的复杂事件

flink-cep

CEP用于分析低延迟、频繁产生的不同来源的事件流。 CEP 可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流 上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

CEP用于分析低延迟、频繁产生的不同来源的事件流。 CEP 可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

环境准备

这里,我们需要引入相关的依赖包。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>1.9.0</version>
</dependency>

基本概念

事件定义

简单事件

简单事件存在于现实场景中,主要特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无需关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。
复杂事件

相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。

事件关系

复杂事件中事件间包含多种关系,常见的有时序关系聚合关系层次关系依赖关系以及因果关系
时序关系

动作事件与动作事件之间、动作事件与状态变化事件之间,都存在时间顺序。事件和事件的时序关系决定了大部分的时序规则,例如: A 事件状态持续为 1 的同时 B 事件状态为 0 等;
聚合关系

动作事件和动作事件之间,状态事件和状态事件之间都存在聚合关系,即个体聚合形成整体集合。例如: A事件状态为 1 的次数为 10 触发预警。
层次关系

动作事件和动作事件之间,状态事件和状态事件之间都存在层次关系,即父类事件和子类事件的层次关系,从父类到子类是具体化的,从子类到父类是泛化的。这个可以对比Java里面的继承关系。
依赖关系

事物的状态属性之间彼此的依赖关系和约束关系,例如 A事件状态触发的条件前提是B事件触发,则AB之间形成了依赖关系。
因果关系

对于完整的动作过程,结果状态为果,初始化状态和动作都可以视为因。

事件处理

复杂事件处理的目的是通过相应的负责对实时数据执行形影的处理策略,这些策略包括了推断、查因、决策、预测等方面的应用。
事件推断

主要利用事务状态之间的约束关系,从一部分状态属性值可以推断出另一部分的状态属性值。举个栗子:1,1,2,3,5,8 …… ,我们可以推断出后面的是: 13,21 ……
事件查因

当出现结果状态,并且知道初识状态,可以查明某个动作的原因;同样,知道结果,知道过程,就可以查明初始状态的原因。这个相当于:f(x) = kx + b ,知道f(x) , 知道 kx+b , 那我们就知道 x .
事件决策

想得到某个结果状态,知道初始状态,决定执行什么动作。该过程和规则引擎相似,例如某个规则符合条件后出发行动,然后执行报警等操作。
事件预测

该种情况知道事件初始状态,以及将要做的动作,预测未来发生的结果状态。例如:天气预报。

Pattern API

FlinkCEP 提供了 Pattern API 用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。

flink-cep-core

每个Pattern 都应该包含几个步骤,或者叫做 state 。从一个 state 到另一个 state . 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Pattern.begin[LoginEvent]("begin")
.where(_.eventType.equals("fail"))
.next("next")
.where(_.eventType.equals("fail"))
.within(Time.seconds(5))

// 或者如下
Pattern.begin[Event]("start")
.where(_.typeEvent.equals("temperature"))
.next("middle")
.subtype(classOf[TempEvent])
.where(_.temp > 35.0)
.followedBy("end")
.where(_.name.equals("end"))

说明:

  1. 每一个state都应该有一个标识,比如:begin[LoginEvent]("begin")这里的 “begin” 和 begin[Event]("start") 这里的 “start”.
  2. 每个state 都需要有一个唯一的名字,而且需要一个 filter 来过滤条 件,这个过滤条件定义事件需要符合的条件.例如:.where(_.eventType.equals("fail"))
  3. 我们也可以通过 subtype 来限制 Event的子类型,例如:.subtype(classOf[TempEvent])
  4. 事实上,你可以多次调用subtype 和 where 方法;而且如果 where 条件是不相关的,你可以通过 or 来指定一个单独的 filter 函数:pattern.where(...).or(...);
  5. 之后,我们可以在此条件基础上,通过next 或者 follow edBy 方法切换到下一个state next 的意思是说上一步符合条件的元素之后紧挨着的元素;而 followedBy 并不要求一定是挨着的元素。这两者分别称为严格近邻和非严格近邻。
  6. 最后,我们可以将所有的Pattern 的条件限定在一定的时间范围内:within(Time.seconds(5))
  7. 时间可以是 Processing Time , 也可以是 Event Time.

Pattern检测

1
2
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
patternStream.select(loginEventSource.keyBy(_.userID) , loginfailPattern)

一旦获得PatternStream ,我们就可以通过 select 或 flatSelect ,从一个 Map 序列找到我们需要的警告信息。

select

select方法需要实现一个 PatternSelectFunction ,通过 select 方法来输出需要的警告。它接受一个 Map 对,包含 string/event ,其中 key 为 state 的名字, event 则为真实的 Event。

1
2
3
4
5
6
7
8
val loginfailPattern = patternStream.select(
(pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()

Warning(first.userID, first.eventTime, second.eventTime, "warning")
})

其返回值仅为1 条记录。

flatSelect

通过实现PatternFlatSelectFunction ,实现与 select 相似的功能。唯一的区别就是 flatSelect 方法可以返回多条记录, 它通过一个 Collector[OUT] 类型的参数来将要输出的数据传递到下游。
超时事件的处理

通过 within 方法,我们的 parttern 规则 将匹配的事件 限定在一定的窗口范围内。当有超过窗口时间 之 后 到达的 event ,我们可以通过在 select 或 flatSelect 中,实现PatternTimeoutFunction 和 PatternFlatTimeoutF unction 来处理这种情况。

1
2
3
4
5
6
7
8
9
10
val out: OutputTag[String] = OutputTag[String]("side-output")
patternStream.select(out){
(pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
TimeoutEvent()
}{
(pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
ComplexEvent()
}
}
}

大体的看完之后,我们FlinkCEP编程也基本就是酱紫。那现在就来详细一点的说一下。

模式定义

个体Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环模式可以接受多个事件。通常情况下,可以指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过 where 方法进行叠合。

个体 Pattern 都是通过 begin 方法定义的,例如以下通过 Pattern.begin 方法基于 Event 事件类型的 Pattern , 其中 时指定的 PatternName 对象。

1
val start = Pattern.begin[Event]("start_pattern")

下一步通过 Pattern.where() 方法在 Pattern 指定 Condition , 只有当 Condition 满足之后,当前的 Pattern 才会接收事件。

1
start.where(_.typeEvent.equals("temperature"))

指定循环次数

对于已经创建好的 Pattern , 可以指定循环次数,形成循环执行的 Pattern , 且有 3 种 方式来指定循环方式。

  • times : 可以通过 times 指定固定的循环执行次数
1
2
3
4
// 指定2循环触发 4 次
start.times(4)
// 可以指定循环次数范围
start.times(2 , 4)
  • optional : 也可以通过 optional 关键字指定要么不触发,要么触发指定次数
1
2
3
4
// 指定2循环触发 4 次
start.times(4).optional()
// 可以指定循环次数范围
start.times(2 , 4).optional()
  • greedy: 可以通过 greedy 将 Pattern 标记为 贪婪模式,在 Pattern 匹配成功的前提下,会尽可能多的触发
1
2
3
4
// 触发 2,3,4 次,尽可能重复执行
start.times(2 , 4).optional()
// 触发 0,2,3,4 次,尽可能重复执行
start.times(2 , 4).optional().greedy()
  • oneOrMore: 可以通过 oneOrMore 方法指定触发一次或多次
1
2
3
4
5
6
7
8
// 触发一次或者多次
start.oneOrMore()
// 触发一次或者多次,尽可能重复执行
start.oneOrMore().greedy()
// 触发 0 次或者 多次
start.oneOrMore().optional()
// 触发 0 次或者 多次 , 尽可能多次执行
start.oneOrMore().optional().greedy()
  • timesOrMore: 通过 timesOrMore 方法可以指定触发固定次数以上,例如执行两次以上:
1
2
3
4
// 触发两次或者多次
start.timesOrMore(2)
// 触发两次或者多次,尽可能多次重复执行
start.timesOrMore(2).greedy()

模式条件

每个模式都需要指定触发条件,作为时间进入到该模式是否接受的判断依据,当时间中的数值满足了条件,便进行下一步操作。在FlinkCEP中通过 patter.where()、pattern.or()、及patter.until()方法来为 Pattern 指定条件,且 Pattern 条件有 Iterative Conditions 、 Simple Conditions 及 Combining Conditions 三中类型。

迭代条件

Iterative Conditions 能够对前面模式所有接收的事件进行处理,根据接收的事件集合统计出计算指标,并作为本次模式匹配中的条件输入参数。如:

1
2
3
4
5
6
7
.oneOrMore
.subtype(classOf[TempEvent])
.where(
(value , ctx) => {
// the condition for you
}
)

通过 subtype 将 Event 事件转换为 TempEvent 事件,然后在 where 条件中通过使用 ctx.getEventsForPattern(…) 方法获取 “middle” 模式所有接收得到 Event 记录,并基于这些 Event 数据之上对温度求取平均值,然后判断当前事件的温度是否小于平均值,然后判断当前事件的温度是否小于平均值。

简单条件

Simple Condition 继承于 Iternative Condition 类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。如下:

1
start.where(event=>event.enevtType.equals("temperature"))

同样,我们可以通过 subtype 对事件进行子类类型转换,然后在 where 方法中针对子类定义模式条件。

组合条件

组合条件是将简单条件进行合并,通常情况也可以使用 where 方法进行条件组合,默认每个条件通过 AND 逻辑相连。如果需要使用 OR 逻辑 , 如:

1
pattern.where(event => event.name.startWith("foo").or(event => enevt.eventType.equals("temperature")))

终止条件

如果程序中使用了 oneOrMore 或者 oneOrMore().optional() 方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如:

1
patern.oneOrMore().until(event => event.name.equals("end"))

请注意:在上述的迭代条件通过调用 ctx.getEventsForPattern(“middle”)

模式序列

将互相独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可。其中有严格邻近,宽松临近,非确定宽松临近三种临近连接条件,如下:

1
val start : Pattern[]

严格邻近

严格邻近条件中,需要所有的事件都按照满足模式条件,不允许忽略任意不满足的模式。如下:在start Pattern 后使用 next 方法指定 下一个 Pattern ,生成严格邻近的 Pattern.

1
val strict : Pattern[Event,_] = start.next("middle").where(...)

宽松邻近

在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求的那么高,可以简单理解为 OR 的逻辑关系。如下:

1
val strict : Pattern[Event,_] = start.followeBy("middle").where(...)

非确定宽松邻近

1
2
3
4
和宽松邻近条件相比,非确定宽松邻近条件指在模式匹配过程中可以忽略已经匹配的条件。如下:

​```scala
val nonDetermin : Pattern[Event,_] = start.followerByAny("middle").where(....)

除了上述条件外, Flink 还提供了 notNext()、notFollowerBy()等链接条件 。notNext() 表示不想让某一模式跟另一个模式之后不发生;notFollowerBy() 强调不想让某一模式触发处于两个模式之间触发。

注意点:模式序列不能以 notFollowerBy() 结尾,且 not 类型的模式不能和 optional 关键字同时使用

模式组

模式序列可以作为 begin , followerBy , floowerByAny 及 next 等连接条件的输入参数从而形成的模式组。在GroupPattern 上可以指定 oneOrMore 、 times 、 optional 等循环条件,应用在 GroupPattern 中的模式序列上,每个模式序列完成自己内部的条件匹配,最后在米欧式组层面对模型序列结果进行汇总。如:

1
2
3
4
5
6
7
8
9
10
val value: GroupPattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
.where(_.name.equals("name"))
.followedBy("start_middle")
.where(_.name.equals("yang")))

val value1: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
.next("next_start")
.where(_.name.equals("name"))
.followedBy("next_middle")
.where(_.name.equals("yang"))).times(3)

AfterMatchSkipStrategy

在给定的 Pattern 中,当同一事件符合多种模式条件组合之后,需要指定 AfterMatchSkipStrategy 策略以处理已经匹配的事件。在 AfterMatchSkipStrategy 配置中有四件事件处理策略,分别为 NO_SKIP / SKIP_PAST_LAST_EVENT / SKIP_TO_FIRST / SKIP_TO_LAST 。 每种策略的定义和使用方式如下:其中SKIP_TO_FIRST 和 SKIP_TO_LAST 在定义过程中需要指定有效的PatternName.

  • NO_SKIP: 该策略表示将所有可能匹配的事件进行输出,不忽略任何一条。
1
AfterMatchSkipStrategy.noSkip()
  • SKIP_PAST_LAST_EVENT: 该策略表示忽略从模式条件开始触发到当前触发 Pattern 中的所有部分匹配事件。
1
AfterMatchSkipStrategy.skipPastLastEvent()
  • SKIP_TO_FIRST: 该策略表示忽略第一个匹配指定 PatternName 的 Pattern 其之前的部分匹配事件。
1
AfterMatchSkipStrategy.skipToFirst(patternName)
  • SKIP_TO_LAST 该策略表示忽略最后一个匹配指定 PatternName 的 Pattern 之前的部分匹配之间
1
AfterMatchSkipStrategy.skipToLast(patternName)
  • SKIP_TO_NEXT: 该策略表示忽略指定 PatternName 的 Pattern 之后的部分匹配事件
1
AfterMatchSkipStrategy.skipToNext(patternName)

选择完 AfterMatchSkipStrategy 之后,可以再创建 Pattern 时 , 通过 begin 方法中指定 skipStrategy , 然后就可以将 AfterMatchSkipStrategy 应用到当前的 Pattern 中。

1
2
val skipStrategy = { }
Pattern.begin("pattern_name" , skipStrategy)

事件获取

对于前面已经定义的模式序列或模式组,需要和输入数据流进行结合,才能发现事件中潜在的匹配关系。如:

1
2
3
4
5
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

FlinkCEP 提供了 CEP.pattern 方法将 DataStream 和 Pattern 应用在一起,得到 PatternStream 类型数据集,且后续时间数据获取都基于PatternStream 进行。另外可以选择创建 EventComparator , 对传入的 Pattern 中的事件 进行排序,当 Event Time 相等或者同时 到达 Pattern 时 , EventComparator 钟定一的排序策略可以帮助事件的先后顺序。

当可以 CEP.pattern 方法被执行后,会生成 PatternStream 数据集,该数据集中包含了所有匹配事件。目前在FlinkCEP中提供了 select 和 flatSelect 两种方法从 PatternStream 提取事件结果。

通过 Select Function 抽取正常事件

可以通过在 PatternStream 的 Select 方法中传入自定义 Seclect Function 完成对匹配事件的转换与输出。其中 Select Function 的输入参数为 Map[String,Iterable[IN]],Map 中的 Key 为模式序列中的 Pattern 名称, Value 为对应 Pattern 所接受的事件集合,格式为输入事件的数据类型。需要注意的是: Select Funtion将会在每次调用后仅输出一条结果 如下:

1
2
3
4
5
6
7
8
def selectFunction (pattern:Map[String,Iterable[IN]]):OUT = {
// 获取 pattern 中的 startEvent
val startEvent = pattern.get("start_pattern").get.next
// 获取 pattern 中的 middleEvent
val middleEvent = pattern.get("middle_pattern").get.next
// 返回结果
OUT(startEvent , middleEvent)
}

通过 Select Function 抽取超时事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
// 创建 OutputTag ,并命名为 timeout-output
val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
// 调用 PatternStream Select() 并指定 timeoutTag
patternStream.select(timeoutTag) {
// 超时时间获取
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
TimeOutEvent()
}

}{
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
NormalEvent()
}
// 调用 getSideOutput 方法,并指定 timeoutTag 将超时事件输出
val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
}

通过 Flat Select Function 抽取正常事件

Flat Seclect Function 和 Select Function 相似,不过 Flat Select Function 在每次调用可以返回任意数量的结果。因为 Flat Select Function 使用 Collector 作为返回结果的容器,可以将需要输出的事件都放置在 Collector 中返回。如下:

1
2
3
4
5
6
7
8
9
10
def faltSelectFunction(pattern:Map[String,Iterable[IN]],collector:Collector[OUT])={
// 获取 pattern 中的 startEvent
val startEvent = pattern.get("start_pattern").get.next
// 获取 pattern 中的 middleEvent
val middleEvent = pattern.get("middle_pattern").get.next
// 根据 startEvent 返回结果
for (i <- 0 to startEvent.value){
collector.collect( OUT(startEvent , middleEvent))
}
}

通过 Flat Select Function 抽取超时事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
// 创建 OutputTag ,并命名为 timeout-output
val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
// 调用 PatternStream Select() 并指定 timeoutTag
patternStream.select(timeoutTag) {
// 超时时间获取
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long , out:Collector[TimeoutEvent]) => {
out.collect(TimeOutEvent())
}

}{
(pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
out.collect(NormalEvent())
}
// 调用 getSideOutput 方法,并指定 timeoutTag 将超时事件输出
val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
}