Wait for cancellation event with CEP

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Wait for cancellation event with CEP

Maxim Parkachov
Hi everyone,

I need to implement following functionality. There is a kafka topic where "forward" events are coming and in the same topic there are "cancel" events. For each "forward" event I need to wait 1 minute for possible "cancel" event. I can uniquely match both events. If "cancel" event comes within this minute I need to delete "forward" event, otherwise pass "forward" event further in another kafka topic. "Cancel" events older than 1 minute could be ignored.

I was thinking to implement ProcessFunction with putting "forward" events in state with timer. If "cancel" event is coming I will delete "forward" event from state.

My question: Is there more simple way to implement the same logic, possibly with CEP ?

Thanks,
Maxim.
Reply | Threaded
Open this post in threaded view
|

Re: Wait for cancellation event with CEP

Till Rohrmann
Hi Maxim,

I think your problem should be solvable with the CEP library:

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = ...;

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.isForward();
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.isCancel();
}
}
).within(Time.minutes(1));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<Alert> outputTag = new OutputTag<Alert>("side-output"){};

SingleOutputStreamOperator<Alert> ignored = patternStream.process(new MyPatternProcessFunction<>(outputTag));

final DataStream<Alert> sideOutput = ignored.getSideOutput(outputTag);

// execute program
env.execute("Flink Streaming Java API Skeleton");
}

public static class MyPatternProcessFunction<Event, Alert> extends PatternProcessFunction<Event, Alert> implements TimedOutPartialMatchHandler<Event> {
private final OutputTag<Alert> outputTag;

public MyPatternProcessFunction(OutputTag<Alert> outputTag) {
this.outputTag = outputTag;
}

@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Alert> out) throws Exception {
// don't do anything since we want the time out case
}

@Override
public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception {
Event startEvent = match.get("start").get(0);
ctx.output(outputTag, new Alert(startEvent));
}
}
}

public static class Event {
boolean isForward() {}

boolean isCancel() {}
}

public static class Alert {}

So what we are doing here is to define a pattern forward followed by any cancel. Moreover we say that it must happen within 1 minute. If this does not happen then we will see a timeout where we can create the follow-up event.

Cheers,
Till

On Thu, Apr 30, 2020 at 12:48 PM Maxim Parkachov <[hidden email]> wrote:
Hi everyone,

I need to implement following functionality. There is a kafka topic where "forward" events are coming and in the same topic there are "cancel" events. For each "forward" event I need to wait 1 minute for possible "cancel" event. I can uniquely match both events. If "cancel" event comes within this minute I need to delete "forward" event, otherwise pass "forward" event further in another kafka topic. "Cancel" events older than 1 minute could be ignored.

I was thinking to implement ProcessFunction with putting "forward" events in state with timer. If "cancel" event is coming I will delete "forward" event from state.

My question: Is there more simple way to implement the same logic, possibly with CEP ?

Thanks,
Maxim.
Reply | Threaded
Open this post in threaded view
|

Re: Wait for cancellation event with CEP

Maxim Parkachov
Hi Till,

thank you for very detailed answer, now it is absolutely clear.

Regards,
Maxim.

On Thu, Apr 30, 2020 at 7:19 PM Till Rohrmann <[hidden email]> wrote:
Hi Maxim,

I think your problem should be solvable with the CEP library:

So what we are doing here is to define a pattern forward followed by any cancel. Moreover we say that it must happen within 1 minute. If this does not happen then we will see a timeout where we can create the follow-up event.

Cheers,
Till