FlinkCEP - Complex event processing for Flink

FlinkCEP is the complex event processing library for Flink. It allows you to easily detect complex event patterns in a stream of endless data. Complex events can then be constructed from matching sequences. This gives you the opportunity to quickly get hold of what’s really important in your data.

Attention The events in the DataStream to which you want to apply pattern matching have to implement proper equals() and hashCode() methods because these are used for comparing and matching events.

Getting Started

If you want to jump right in, you have to set up a Flink program. Next, you have to add the FlinkCEP dependency to the pom.xml of your project.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

Note that FlinkCEP is currently not part of the binary distribution. See linking with it for cluster execution here.

Now you can start writing your first CEP program using the pattern API.

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
    .followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(pattern -> {
    return createAlertFrom(pattern);
});
val input: DataStream[Event] = ...

val pattern = Pattern.begin("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

Note that we use Java 8 lambdas in our Java code examples to make them more succinct.

The Pattern API

The pattern API allows you to quickly define complex event patterns.

Each pattern consists of multiple stages or what we call states. In order to go from one state to the next, the user can specify conditions. These conditions can be the contiguity of events or a filter condition on an event.

Each pattern has to start with an initial state:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")

Each state must have a unique name to identify the matched events later on. Additionally, we can specify a filter condition for the event to be accepted as the start event via the where method. These filtering conditions can be either an IterativeCondition or a SimpleCondition.

Iterative Conditions: This type of conditions can iterate over the previously accepted elements in the pattern and decide to accept a new element or not, based on some statistic over those elements.

Below is the code for an iterative condition that accepts elements whose name start with “foo” and for which, the sum of the prices of the previously accepted elements for a state named “middle”, plus the price of the current event, do not exceed the value of 5.0. Iterative condition can be very powerful, especially in combination with quantifiers, e.g. oneToMany or zeroToMany.

start.where(new IterativeCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
        if (!value.getName().startsWith("foo")) {
            return false;
        }
        
        double sum = value.getPrice();
        for (Event event : ctx.getEventsForPattern("middle")) {
            sum += event.getPrice();
        }
        return Double.compare(sum, 5.0) < 0;
    }
});
start.where(
    (value, ctx) => {
        lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
        value.getName.startsWith("foo") && sum + value.getPrice < 5.0
    }
)

Attention The call to Context.getEventsForPattern(...) has to find the elements that belong to the pattern. The cost of this operation can vary, so when implementing your condition, try to minimize the times the method is called.

Simple Conditions: This type of conditions extend the aforementioned IterativeCondition class. They are simple filtering conditions that decide to accept an element or not, based only on properties of the element itself.

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
});
start.where(event => ... /* some condition */)

We can also restrict the type of the accepted event to some subtype of the initial event type (here Event) via the subtype method.

start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype. In fact, you can always provide multiple conditions by calling where and subtype multiple times. These conditions will then be combined using the logical AND operator.

In order to construct or conditions, one has to call the or method with a respective filter function. Any existing filter function is then ORed with the given one.

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

Next, we can append further states to detect complex patterns. We can control the contiguity of two succeeding events to be accepted by the pattern.

Strict contiguity means that two matching events have to be directly the one after the other. This means that no other events can occur in between. A strict contiguity pattern state can be created via the next method.

Pattern<Event, ?> strictNext = start.next("middle");
val strictNext: Pattern[Event, _] = start.next("middle")

Non-strict contiguity means that other events are allowed to occur in-between two matching events. A non-strict contiguity pattern state can be created via the followedBy or followedByAny method.

Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or all. In the latter case multiple matches will be emitted for the same beginning.

Pattern<Event, ?> nonStrictNext = start.followedByAny("middle");
val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle")

It is also possible to define a temporal constraint for the pattern to be valid. For example, one can define that a pattern should occur within 10 seconds via the within method. Temporal patterns are supported for both processing and event time.

next.within(Time.seconds(10));
next.within(Time.seconds(10))


Pattern Operation Description
Begin

Defines a starting pattern state:

Pattern<Event, ?> start = Pattern.<Event>begin("start");
Next

Appends a new pattern state. A matching event has to directly succeed the previous matching event:

Pattern<Event, ?> next = start.next("next");
FollowedBy

Appends a new pattern state. Other events can occur between a matching event and the previous matching event:

Pattern<Event, ?> followedBy = start.followedBy("next");
Where

Defines a condition for the current pattern state. Only if an event satisifes the condition, it can match the state:

patternState.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
});
Or

Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:

patternState.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
}).or(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
Subtype

Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:

patternState.subtype(SubEvent.class);
Within

Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:

patternState.within(Time.seconds(10));
ZeroOrMore

Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.

If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.zeroOrMore();
OneOrMore

Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.

If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.oneOrMore();
Optional

Specifies that this pattern can occur zero or once.

patternState.optional();
Times

Specifies exact number of times that this pattern should be matched.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.times(2);
Consecutive

Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.

If not applied a relaxed continuity (as in followedBy) is used.

E.g. a pattern like:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
           @Override
           public boolean filter(Event value) throws Exception {
               return value.getName().equals("c");
           }
      })
      .followedBy("middle").where(new SimpleCondition<Event>() {
           @Override
           public boolean filter(Event value) throws Exception {
               return value.getName().equals("a");
           }
      })
      .oneOrMore(true).consecutive()
      .followedBy("end1").where(new SimpleCondition<Event>() {
           @Override
           public boolean filter(Event value) throws Exception {
               return value.getName().equals("b");
           }
      });

Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B

with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

NOTICE: This option can be applied only to zeroOrMore(), oneOrMore() and times()!

Pattern Operation Description
Begin

Defines a starting pattern state:

val start = Pattern.begin[Event]("start")
Next

Appends a new pattern state. A matching event has to directly succeed the previous matching event:

val next = start.next("middle")
FollowedBy

Appends a new pattern state. Other events can occur between a matching event and the previous matching event:

val followedBy = start.followedBy("middle")
Where

Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:

patternState.where(event => ... /* some condition */)
Or

Adds a new filter condition which is ORed with an existing filter condition. Only if an event passes the filter condition, it can match the state:

patternState.where(event => ... /* some condition */)
    .or(event => ... /* alternative condition */)
Subtype

Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:

patternState.subtype(classOf[SubEvent])
Within

Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:

patternState.within(Time.seconds(10))
ZeroOrMore

Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.

If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.zeroOrMore()
OneOrMore

Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.

If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.oneOrMore()
Optional

Specifies that this pattern can occur zero or once.

patternState.optional()
Times

Specifies exact number of times that this pattern should be matched.

By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see consecutive

patternState.times(2)
Consecutive

Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.

If not applied a relaxed continuity (as in followedBy) is used.

Pattern.begin("start").where(_.getName().equals("c"))
       .followedBy("middle").where(_.getName().equals("a"))
                            .oneOrMore(true).consecutive()
       .followedBy("end1").where(_.getName().equals("b"));

Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B

with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

NOTICE: This option can be applied only to zeroOrMore(), oneOrMore() and times()!

Detecting Patterns

In order to run a stream of events against your pattern, you have to create a PatternStream. Given an input stream input and a pattern pattern, you create the PatternStream by calling

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

Selecting from Patterns

Once you have obtained a PatternStream you can select from detected event sequences via the select or flatSelect methods.

The select method requires a PatternSelectFunction implementation. A PatternSelectFunction has a select method which is called for each matching event sequence. It receives a map of string/event pairs of the matched events. The string is defined by the name of the state to which the event has been matched. The select method can return exactly one result.

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        return new OUT(startEvent, endEvent);
    }
}

A PatternFlatSelectFunction is similar to the PatternSelectFunction, with the only distinction that it can return an arbitrary number of results. In order to do this, the select method has an additional Collector parameter which is used for the element output.

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

The select method takes a selection function as argument, which is called for each matching event sequence. It receives a map of string/event pairs of the matched events. The string is defined by the name of the state to which the event has been matched. The selection function returns exactly one result per call.

def selectFn(pattern : mutable.Map[String, IN]): OUT = {
    val startEvent = pattern.get("start").get
    val endEvent = pattern.get("end").get
    OUT(startEvent, endEvent)
}

The flatSelect method is similar to the select method. Their only difference is that the function passed to the flatSelect method can return an arbitrary number of results per call. In order to do this, the function for flatSelect has an additional Collector parameter which is used for the element output.

def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {
    val startEvent = pattern.get("start").get
    val endEvent = pattern.get("end").get
    for (i <- 0 to startEvent.getValue) {
        collector.collect(OUT(startEvent, endEvent))
    }
}

Handling Timed Out Partial Patterns

Whenever a pattern has a window length associated via the within keyword, it is possible that partial event patterns will be discarded because they exceed the window length. In order to react to these timeout events the select and flatSelect API calls allow a timeout handler to be specified. This timeout handler is called for each partial event pattern which has timed out. The timeout handler receives all the events that have been matched so far by the pattern, and the timestamp when the timeout was detected.

In order to treat partial patterns, the select and flatSelect API calls offer an overloaded version which takes as the first parameter a PatternTimeoutFunction/PatternFlatTimeoutFunction and as second parameter the known PatternSelectFunction/PatternFlatSelectFunction. The return type of the timeout function can be different from the select function. The timeout event and the select event are wrapped in Either.Left and Either.Right respectively so that the resulting data stream is of type org.apache.flink.types.Either.

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);

In order to treat partial patterns, the select API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred. The string is defined by the name of the state to which the event has been matched. The timeout function returns exactly one result per call. The return type of the timeout function can be different from the select function. The timeout event and the select event are wrapped in Left and Right respectively so that the resulting data stream is of type Either.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
} {
    pattern: mutable.Map[String, Event] => ComplexEvent()
}

The flatSelect API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. In contrast to the select functions, the flatSelect functions are called with an Collector. The collector can be used to emit an arbitrary number of events.

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

Handling Lateness in Event Time

In CEP the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.

Attention The library assumes correctness of the watermark when working in event time.

To also guarantee that elements across watermarks are processed in event-time order, Flink’s CEP library assumes correctness of the watermark, and considers as late elements whose timestamp is smaller than that of the last seen watermark. Late elements are not further processed but they can be redirected to a [side output] (//flink.iteblog.com/dev/stream/side_output.html) dedicated to them.

To access the stream of late elements, you first need to specify that you want to get the late data using .sideOutputLateData(OutputTag) on the PatternStream returned using the CEP.pattern(...) call. If you do not do so, the late elements will be silently dropped. Then, you can get the side-output stream using the .getSideOutput(OutputTag) on the aforementioned PatternStream, and providing as argument the output tag used in the .sideOutputLateData(OutputTag):

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

PatternStream<T> patternStream = CEP.pattern(...)
    .sideOutputLateData(lateOutputTag);

// main output with matches
DataStream<O> result = patternStream.select(...)    

// side output containing the late events
DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
val lateOutputTag = OutputTag[T]("late-data")

val patternStream: PatternStream[T] = CEP.pattern(...)
    .sideOutputLateData(lateOutputTag)

// main output with matches
val result = patternStream.select(...)

// side output containing the late events
val lateStream = patternStream.getSideOutput(lateOutputTag)

Examples

The following example detects the pattern start, middle(name = "error") -> end(name = "critical") on a keyed data stream of Events. The events are keyed by their ids and a valid pattern has to occur within 10 seconds. The whole processing is done with event time.

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
	@Override
	public Integer getKey(Event value) throws Exception {
		return value.getId();
	}
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
	.next("middle").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("error");
		}
	}).followedBy("end").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("critical");
		}
	}).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
	@Override
	public Alert select(Map<String, Event> pattern) throws Exception {
		return createAlert(pattern);
	}
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_)))