COMPLEX EVENT PROCESSING
WITH APACHE FLINK
KOSTAS KLOUDAS COMMITTER @ APACHE FLINK
- SOFTWARE ENGINEER @ DATA ARTISANS
ABOUT DATA ARTISANS
Original creators of Open Source Apache Flink
Apache Flink® + dA Application Manager
2 © 2018 data Artisans
WHAT IS CEP?
3 © 2018 data Artisans
CEP: COMPLEX EVENT PROCESSING
• Detecting event patterns
• Over continuous streams of events
• Often arriving out-of-order
4 © 2018 data Artisans
CEP: COMPLEX EVENT PROCESSING
Input
5 © 2018 data Artisans
CEP: COMPLEX EVENT PROCESSING
Input
Pattern
6 © 2018 data Artisans
CEP: COMPLEX EVENT PROCESSING
Input
Output
Pattern
7 © 2018 data Artisans
CEP: EXAMPLES
• Security: Raise alert when user a has X consecutive failed logins
within Y secs, or he/she prints more than Z important
documents within Y seconds.
• Supply Chain Monitoring: Detect all intermediate stops of all
shipments that started from a contaminated site, or report all
shipments which took in total more than Y hours.
• Stocks trends: detect stocks that start high, keep a high avg.
price for a period, and then volume plummets.
8 © 2018 data Artisans
CEP: USE-CASES
• IoT
• Fraud Detection
• Intrusion Detection
• Inventory Management
• Click Stream Analysis
• Trend Detection in financial sector
• ...yours?
9 © 2018 data Artisans
WHAT IS STREAM PROCESSING?
10 © 2018 data Artisans
STREAM PROCESSING
Computations on
never-ending
Computation “streams” of events
11 © 2018 data Artisans
DISTRIBUTED STREAM PROCESSING
Computation
Computation Computation Computation
spread across
many machines
12 © 2018 data Artisans
STATEFUL STREAM PROCESSING
State
Result depends
on history of
Computation
stream
13 © 2018 data Artisans
Stream Processors are a natural fit
for CEP
14 © 2018 data Artisans
CEP ON FLINK
Input
FlinkCEP
Output
Pattern
15 © 2018 data Artisans
What does FlinkCEP offer?
16 © 2018 data Artisans
PATTERN DEFINITION Pattern
P1 P2
17 © 2018 data Artisans
PATTERN DEFINITION Pattern
• Composed of Individual Patterns P1 P2
‒ P1(shape == rectangle)
‒ P2(shape == triangle)
18 © 2018 data Artisans
PATTERN DEFINITION Pattern
• Composed of Individual Patterns P1 P2
‒ P1(shape == rectangle)
‒ P2(shape == triangle)
• Combined by Contiguity Conditions
‒...later
19 © 2018 data Artisans
FLINKCEP INDIVIDUAL PATTERNS
• Unique Name Pattern
P1 P2
• Condition : which elements to accept
‒ Simple e.g shape == rectangle
‒ Iterative e.g rectangle.surface < triangle.surface
• Quantifiers (or not)
‒ Looping/Optional: oneOrMore(), times(#), times(from,to), optional(),
until(cond.), greedy()
20 © 2018 data Artisans
FLINKCEP COMPLEX PATTERNS Pattern
P1 P2
• Combine Individual Patterns
‒ Contiguity Conditions: how to select relevant events given an input mixing
relevant and irrelevant events
‒ Matching Semantics: where to start searching for a new match
‒ Time Constraints: within(time) e.g. all events have to come within 24h
• Combine Complex Patterns
‒ Grouping Complex Patterns: the “parenthesis” of pattern definition.
21 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern
Input
22 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern Strict Contiguity
• matching events strictly follow each other
Input Output
23 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern
Input Output
24 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern Relaxed Contiguity
• non-matching events to simply be ignored
Input Output
25 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern
Input Output
26 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern
Input Output
27 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern Non-Deterministic Relaxed Contiguity
• allows non-deterministic actions on relevant events
Input Output
28 © 2018 data Artisans
FLINKCEP CONTIGUITY CONDITIONS
Pattern NOT patterns:
• for strict and relaxed contiguity
• for cases where an event should invalidate a match
Input
29 © 2018 data Artisans
FLINKCEP
• Individual Patterns:
‒ Quantifiers
‒ Conditions: Simple & Iterative
• Complex Patterns:
‒ Time Constraints: Event and Processing time
‒Contiguity Constraints: Strict, relaxed, non-deterministic, NOT
‒Matching Semantics: Where to start searching for a new match
‒Grouping Complex Patterns: The “parenthesis” of pattern definition.
30 © 2018 data Artisans
What is coming in FlinkCEP?
31 © 2018 data Artisans
ROADMAP
• Integration with SQL:
‒ match_recognize() clause in SQL 2016
‒ write SQL and detect CEP patterns on the result
• Dynamic pattern specification:
‒ add(), remove() patterns to be evaluated against your stream
32 © 2018 data Artisans
Example
33 © 2018 data Artisans
RUNNING EXAMPLE: RETAILER
• Trace all shipments which:
‒start at location A
‒have at least 5 stops
‒end at location B A M1
‒within the last 24h M2
M3
M4
M5
B
34 © 2018 data Artisans
OBSERVATION A INDIVIDUAL PATTERNS
• Trace all shipments which:
‒start at location A
‒have at least 5 stops
Start
‒end at location B
‒within the last 24h ev.from == A
ev[i].from
Mid ==
ev[i-1].to
ev.to == B
&&
size(“mid”) >= 5
End
35 © 2018 data Artisans
OBSERVATION B QUANTIFIERS
• Start/End: single event
• Middle: multiple events Start
‒5 or more ev.from == A
ev[i].from
Mid ==
ev[i-1].to
ev.to == B
&&
size(“mid”) >= 5
End
36 © 2018 data Artisans
OBSERVATION C CONDITIONS
• Start -> Simple
‒properties of the event
Start
• Middle/End -> Iterative ev.from == A
‒depend on previous events ev[i].from
Mid ==
ev[i-1].to
ev.to == B
&&
size(“mid”) >= 5
End
37 © 2018 data Artisans
OBSERVATION D TIME CONSTRAINTS
• Trace all shipments which:
‒start at location A
‒have at least 5 stops Start
‒end at location B
ev.from == A
‒within the last 24h ev[i].from
Mid ==
ev[i-1].to
ev.to == B
&&
size(“mid”) >= 5
End
38 © 2018 data Artisans
OBSERVATION E CONTIGUITY
• We opt for relaxed continuity
39 © 2018 data Artisans
EXAMPLE INDIVIDUAL PATTERNS
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start")
Start
.where(mySimpleCondition)
.followedBy ("middle")
.where(myIterativeCondition1) Middle
.oneOrMore()
.followedBy ("end”)
End
.where(myIterativeCondition2)
.within(Time.hours(24))
40 © 2018 data Artisans
EXAMPLE QUANTIFIERS
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start")
Start
.where(mySimpleCondition)
.followedBy ("middle")
.where(myIterativeCondition1) Middle
.oneOrMore()
.followedBy ("end”)
End
.where(myIterativeCondition2)
.within(Time.hours(24))
41 © 2018 data Artisans
EXAMPLE CONDITIONS
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start")
Start
.where(mySimpleCondition)
.followedBy ("middle")
.where(myIterativeCondition1) Middle
.oneOrMore()
.followedBy ("end”)
End
.where(myIterativeCondition2)
.within(Time.hours(24))
42 © 2018 data Artisans
EXAMPLE TIME CONSTRAINT
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start")
Start
.where(mySimpleCondition)
.followedBy ("middle")
.where(myIterativeCondition1) Middle
.oneOrMore()
.followedBy ("end”)
End
.where(myIterativeCondition2)
.within(Time.hours(24))
43 © 2018 data Artisans
RUNNING EXAMPLE INTEGRATION
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select (
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
return parseMatch(pattern);
}
);
44 © 2018 data Artisans
RUNNING EXAMPLE INTEGRATION
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select (
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
return parseMatch(pattern);
}
);
45 © 2018 data Artisans
RUNNING EXAMPLE INTEGRATION
Pattern<Event, ?> pattern = ...
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select (
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) {
return parseMatch(pattern);
}
);
46 © 2018 data Artisans
CALL FOR PRESENTATIONS IS OPEN
47 © 2018 data Artisans
THANK YOU!
@kkloudas
@dataArtisans WE ARE HIRING
@ApacheFlink data-artisans.com/careers
POWERED BY APACHE FLINK
49 © 2018 data Artisans