FlinkCEP 中提供了 Pattern API 用于对输入流数据的复杂事件规则定义,并从事件流 中抽取事件结果。包含四个步骤:
输入事件流的创建Pattern 的定义Pattern 应用在事件流上检测选取结果
定义 Pattern 可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受 一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数 将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条 件组合可以通过 where 方法进行叠加。每个 Pattern 都是通过 begin 方法定义的
val start = Pattern.begin[Event]("start_pattern")
下一步通过 Pattern.where()方法在 Pattern 上指定 Condition,只有当 Condition 满 足之后,当前的 Pattern 才会接受事件。
start.where(_.getCallType == "success")
//指定循环触发4次 start.times(4); //可以执行触发次数范围,让循环执行次数在该范围之内 start.times(2, 4);
start.times(4).optional(); start.times(2, 4).optional();
//触发2、3、4次,尽可能重复执行 start.times(2, 4).greedy(); //触发0、2、3、4次,尽可能重复执行 start.times(2, 4).optional().greedy();
// 触发一次或者多次 start.oneOrMore(); //触发一次或者多次,尽可能重复执行 start.oneOrMore().greedy(); // 触发0次或者多次 start.oneOrMore().optional(); // 触发0次或者多次,尽可能重复执行 start.oneOrMore().optional().greedy();
// 触发两次或者多次 start.timesOrMore(2); // 触发两次或者多次,尽可能重复执行 start.timesOrMore(2).greedy(); // 不触发或者触发两次以上,尽可能重复执行 start.timesOrMore(2).optional().greedy();
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中 的数值满足了条件时,便进行下一步操作。在 FlinkCFP 中通过 pattern.where()、 pattern.or()及 pattern.until()方法来为 Pattern 指定条件,且 Pattern 条件有 Simple Conditions 及 Combining Conditions 等类型。
// 把通话成功的事件挑选出来 start.where(_.getCallType == "success")
// 把通话成功,或者通话时长大于10秒的事件挑选出来 val start = Pattern.begin[StationLog]("start_pattern") .where(_.callType=="success") .or(_.duration>10)
pattern.oneOrMore.until(_.callOut.startsWith("186"))
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻 近三种邻近连接条件。
val strict: Pattern[Event] = start.next("middle").where(...)
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
注意: 1、所有模式序列必须以 .begin() 开始 2、模式序列不能以 .notFollowedBy() 结束 3、“not” 类型的模式不能被 optional 所修饰 4、此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
//cep 做模式检测 val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)
得到 PatternStream 类型的数据集后,接下来数据获取都基于 PatternStream 进行。该 数据集中包含了所有的匹配事件。目前在 FlinkCEP 中提供 select 和 flatSelect 两种方法 从 PatternStream 提取事件结果事件。