Flink CEP: can't process PatternStream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP: can't process PatternStream

Maminspapin
Hello everyone.
 
I’m trying to use Flink Cep library and I want to fetch some events by pattern. At first I’ve created a simple HelloWorld project. But I have a problem exactly like it described here: https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed
 
 
No actions are heppend at this block:
 
        DataStream<String> alerts = patternStream
                .process(new PatternProcessFunction<String, String>() {
                    @Override
                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector)
                            throws Exception {
                       
                        String first = map.get("first").get(0);
                        System.out.println("First: " + first);
                    }
                });
        alerts.print();
 
Can someone help me understand the cause?
 
Thanks,
Yuri L.
 
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP: can't process PatternStream

Dawid Wysakowicz-2

Hi Yuri,

Which Flink version are you using? Is it 1.12? In 1.12 we changed the default TimeCharacteristic to EventTime. Therefore you need watermarks and timestamp[1] for your program to work correctly. If you want to apply your pattern in ProcessingTime you can do:

PatternStream<String> patternStream = CEP.pattern(stream, pattern).inProcessingTime();

Basically you are facing exactly the same problem as described in the stackoverflow entry you posted.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#generating-watermarks

On 26/02/2021 09:18, Люльченко Юрий Николаевич wrote:
Hello everyone.
 
I’m trying to use Flink Cep library and I want to fetch some events by pattern. At first I’ve created a simple HelloWorld project. But I have a problem exactly like it described here: https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed
 
 
No actions are heppend at this block:
 
        DataStream<String> alerts = patternStream
                .process(new PatternProcessFunction<String, String>() {
                    @Override
                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector)
                            throws Exception {
                       
                        String first = map.get("first").get(0);
                        System.out.println("First: " + first);
                    }
                });
        alerts.print();
 
Can someone help me understand the cause?
 
Thanks,
Yuri L.
 

OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP: can't process PatternStream

Maminspapin
Hello, David.

Yes, I’m using 1.12. And my code is now working. Thank you very much for
your comment.
 
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/