CEP - Complex Event Processing复杂事件处理。
订单下单后超过一定时间还未进行支付确认。
打车订单生成后超过一定时间没有确认上车。
外卖超过预定送达时间一定时限还没有确认送达。
Apache FlinkCEP API
CEPTimeoutEventJob
FlinkCEP源码简析
DataStream和PatternStream
DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。
PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:
public
static
<IN, OUT>
SingleOutputStreamOperator
<OUT> createPatternStream(...){...}
public
static
<IN, OUT1, OUT2>
SingleOutputStreamOperator
<OUT1> createTimeoutPatternStream(...){...}
final
SingleOutputStreamOperator
<OUT> patternStream;
SingleOutputStreamOperator
@Public
public
class
SingleOutputStreamOperator
<T>
extends
DataStream
<T> {...}
PatternStream的构造方法:
PatternStream
(
final
DataStream
<T> inputStream,
final
Pattern
<T, ?> pattern) {
this
.inputStream = inputStream;
this
.pattern = pattern;
this
.comparator =
null
;
}
PatternStream
(
final
DataStream
<T> inputStream,
final
Pattern
<T, ?> pattern,
final
EventComparator
<T> comparator) {
this
.inputStream = inputStream;
this
.pattern = pattern;
this
.comparator = comparator;
}
Pattern、Quantifier和EventComparator
Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。
如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。
public
class
Pattern
<T, F
extends
T> {
private
final
String
name;
private
final
Pattern
<T, ?
extends
T> previous;
private
IterativeCondition
<F> condition;
private
Time
windowTime;
private
Quantifier
quantifier =
Quantifier
.one(
ConsumingStrategy
.STRICT);
private
IterativeCondition
<F> untilCondition;
private
Times
times;
// 匹配到事件之后的跳过策略
private
final
AfterMatchSkipStrategy
afterMatchSkipStrategy;
...
}
Quantifier是用来描述具体模式行为的,主要有三大类:
Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。
每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。
循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。
public
class
Quantifier
{
...
public
enum
QuantifierProperty
{
SINGLE,
LOOPING,
TIMES,
OPTIONAL,
GREEDY
}
public
enum
ConsumingStrategy
{
STRICT,
SKIP_TILL_NEXT,
SKIP_TILL_ANY,
NOT_FOLLOW,
NOT_NEXT
}
public
static
class
Times
{
private
final
int
from;
private
final
int
to;
private
Times
(
int
from,
int
to) {
Preconditions
.checkArgument(from >
0
,
"The from should be a positive number greater than 0."
);
Preconditions
.checkArgument(to >= from,
"The to should be a number greater than or equal to from: "
+ from +
"."
);
this
.from = from;
this
.to = to;
}
public
int
getFrom() {
return
from;
}
public
int
getTo() {
return
to;
}
// 次数范围
public
static
Times
of(
int
from,
int
to) {
return
new
Times
(from, to);
}
// 指定具体次数
public
static
Times
of(
int
times) {
return
new
Times
(times, times);
}
@Override
public
boolean
equals(
Object
o) {
if
(
this
== o) {
return
true
;
}
if
(o ==
null
|| getClass() != o.getClass()) {
return
false
;
}
Times
times = (
Times
) o;
return
from == times.from &&
to == times.to;
}
@Override
public
int
hashCode() {
return
Objects
.hash(from, to);
}
}
...
}
EventComparator,自定义事件比较器,实现EventComparator接口。
public
interface
EventComparator
<T>
extends
Comparator
<T>,
Serializable
{
long
serialVersionUID =
1L
;
}
NFACompiler和NFA
NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。
public
class
NFACompiler
{
...
public
interface
NFAFactory
<T>
extends
Serializable
{
NFA<T> createNFA();
}
private
static
class
NFAFactoryImpl
<T>
implements
NFAFactory
<T> {
private
static
final
long
serialVersionUID =
8939783698296714379L
;
private
final
long
windowTime;
private
final
Collection
<
State
<T>> states;
private
final
boolean
timeoutHandling;
private
NFAFactoryImpl
(
long
windowTime,
Collection
<
State
<T>> states,
boolean
timeoutHandling) {
this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
}
@Override
public
NFA<T> createNFA() {
// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return
new
NFA<>(states, windowTime, timeoutHandling);
}
}
}
NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。
更多内容参见
https://zh.wikipedia.org/wiki/非确定有限状态自动机
public
class
NFA<T> {
private
final
Map
<
String
,
State
<T>> states;
private
final
long
windowTime;
private
final
boolean
handleTimeout;
...
}
PatternSelectFunction和PatternFlatSelectFunction
当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。
public
interface
PatternSelectFunction
<IN, OUT>
extends
Function
,
Serializable
{
OUT select(
Map
<
String
,
List
<IN>> pattern)
throws
Exception
;
}
PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。
public
interface
PatternFlatSelectFunction
<IN, OUT>
extends
Function
,
Serializable
{
void
flatSelect(
Map
<
String
,
List
<IN>> pattern,
Collector
<OUT> out)
throws
Exception
;
}
SelectTimeoutCepOperator、PatternTimeoutFunction
SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。
SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。
模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。
public
class
SelectTimeoutCepOperator
<IN, OUT1, OUT2, KEY>
extends
AbstractKeyedCEPPatternOperator
<IN, KEY, OUT1,
SelectTimeoutCepOperator
.
SelectWrapper
<IN, OUT1, OUT2>> {
private
OutputTag
<OUT2> timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
<IN> inputSerializer,
boolean
isProcessingTime,
NFACompiler
.
NFAFactory
<IN> nfaFactory,
final
EventComparator
<IN> comparator,
AfterMatchSkipStrategy
skipStrategy,
// 参数命名混淆了flat...包括SelectWrapper类中的成员命名...
PatternSelectFunction
<IN, OUT1> flatSelectFunction,
PatternTimeoutFunction
<IN, OUT2> flatTimeoutFunction,
OutputTag
<OUT2> outputTag,
OutputTag
<IN> lateDataOutputTag) {
super
(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
skipStrategy,
new
SelectWrapper
<>(flatSelectFunction, flatTimeoutFunction),
lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
}
...
}
public
interface
PatternTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
{
OUT timeout(
Map
<
String
,
List
<IN>> pattern,
long
timeoutTimestamp)
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
{
void
timeout(
Map
<
String
,
List
<IN>> pattern,
long
timeoutTimestamp,
Collector
<OUT> out)
throws
Exception
;
}
CEP和CEPOperatorUtils
CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。
public
class
CEP {
public
static
<T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern) {
return
new
PatternStream
<>(input, pattern);
}
public
static
<T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern,
EventComparator
<T> comparator) {
return
new
PatternStream
<>(input, pattern, comparator);
}
}
CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。
public
class
CEPOperatorUtils
{
...
private
static
<IN, OUT, K>
SingleOutputStreamOperator
<OUT> createPatternStream(
final
DataStream
<IN> inputStream,
final
Pattern
<IN, ?> pattern,
final
TypeInformation
<OUT> outTypeInfo,
final
boolean
timeoutHandling,
final
EventComparator
<IN> comparator,
final
OperatorBuilder
<IN, OUT> operatorBuilder) {
final
TypeSerializer
<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
// check whether we use processing time
final
boolean
isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==
TimeCharacteristic
.
ProcessingTime
;
// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
<IN> nfaFactory =
NFACompiler
.compileFactory(pattern, timeoutHandling);
final
SingleOutputStreamOperator
<OUT> patternStream;
if
(inputStream
instanceof
KeyedStream
) {
KeyedStream
<IN, K> keyedStream = (
KeyedStream
<IN, K>) inputStream;
patternStream = keyedStream.transform(
operatorBuilder.getKeyedOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()));
}
else
{
KeySelector
<IN,
Byte
> keySelector =
new
NullByteKeySelector
<>();
patternStream = inputStream.keyBy(keySelector).transform(
operatorBuilder.getOperatorName(),
outTypeInfo,
operatorBuilder.build(
inputSerializer,
isProcessingTime,
nfaFactory,
comparator,
pattern.getAfterMatchSkipStrategy()
)).forceNonParallel();
}
return
patternStream;
}
...
}
FlinkCEP实现步骤
- IN: DataSource -> DataStream -> Transformations -> DataStream
- Pattern: Pattern.begin.where.next.where...times...
- PatternStream: CEP.pattern(DataStream, Pattern)
- DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
- OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP匹配超时实现步骤
TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。
KeySelector
<IN,
Byte
> keySelector =
new
NullByteKeySelector
<>();
Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。
- IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
- Pattern: Pattern.begin.where.next.where...within(Time windowTime)
- PatternStream: CEP.pattern(KeyedStream, Pattern)
- OutputTag: new OutputTag(...)
- SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
- DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
- OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP超时不足
和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。
FlinkCEP超时完整demo
public
class
CEPTimeoutEventJob
{
private
static
final
String
LOCAL_KAFKA_BROKER =
"localhost:9092"
;
private
static
final
String
GROUP_ID =
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
GROUP_TOPIC = GROUP_ID;
public
static
void
main(
String
[] args)
throws
Exception
{
// 参数
ParameterTool
params =
ParameterTool
.fromArgs(args);
StreamExecutionEnvironment
env =
StreamExecutionEnvironment
.getExecutionEnvironment();
// 使用事件时间
env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
env.enableCheckpointing(
5000
);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));
// 不使用POJO的时间
final
AssignerWithPeriodicWatermarks
extractor =
new
IngestionTimeExtractor
<POJO>();
// 与Kafka Topic的Partition保持一致
env.setParallelism(
3
);
Properties
kafkaProps =
new
Properties
();
kafkaProps.setProperty(
"bootstrap.servers"
, LOCAL_KAFKA_BROKER);
kafkaProps.setProperty(
"group.id"
, GROUP_ID);
// 接入Kafka的消息
FlinkKafkaConsumer011
<POJO> consumer =
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC,
new
POJOSchema
(), kafkaProps);
DataStream
<POJO> pojoDataStream = env.addSource(consumer)
.assignTimestampsAndWatermarks(extractor);
pojoDataStream.print();
// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
// 1.
DataStream
<POJO> keyedPojos = pojoDataStream
.keyBy(
"aid"
);
// 从初始化到终态-一个完整的POJO事件序列
// 2.
Pattern
<POJO, POJO> completedPojo =
Pattern
.<POJO>begin(
"init"
)
.where(
new
SimpleCondition
<POJO>() {
private
static
final
long
serialVersionUID = -
6847788055093903603L
;
@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
"02"
.equals(pojo.getAstatus());
}
})
.followedBy(
"end"
)
// .next("end")
.where(
new
SimpleCondition
<POJO>() {
private
static
final
long
serialVersionUID = -
2655089736460847552L
;
@Override
public
boolean
filter(POJO pojo)
throws
Exception
{
return
"00"
.equals(pojo.getAstatus()) ||
"01"
.equals(pojo.getAstatus());
}
});
// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
// 3.
PatternStream
<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));
// 定义侧面输出timedout
// 4.
OutputTag
<POJO> timedout =
new
OutputTag
<POJO>(
"timedout"
) {
private
static
final
long
serialVersionUID =
773503794597666247L
;
};
// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
<POJO> timeoutPojos = patternStream.flatSelect(
timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
);
// 打印输出超时的POJO
// 6.7.
timeoutPojos.getSideOutput(timedout).print();
timeoutPojos.print();
env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
}
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
<POJO, POJO> {
private
static
final
long
serialVersionUID = -
4214641891396057732L
;
@Override
public
void
timeout(
Map
<
String
,
List
<POJO>> map,
long
l,
Collector
<POJO> collector)
throws
Exception
{
if
(
null
!= map.get(
"init"
)) {
for
(POJO pojoInit : map.get(
"init"
)) {
System
.out.println(
"timeout init:"
+ pojoInit.getAid());
collector.collect(pojoInit);
}
}
// 因为end超时了,还没收到end,所以这里是拿不到end的
System
.out.println(
"timeout end: "
+ map.get(
"end"
));
}
}
public
static
class
FlatSelectNothing
<T>
implements
PatternFlatSelectFunction
<T, T> {
private
static
final
long
serialVersionUID = -
3029589950677623844L
;
@Override
public
void
flatSelect(
Map
<
String
,
List
<T>> pattern,
Collector
<T> collector) {
System
.out.println(
"flatSelect: "
+ pattern);
}
}
}
测试结果(followedBy):
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}]}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
总结
以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!