Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Maminspapin
Hello,
 
I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html, and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 

public class Main {

 

    public static void main(String[] args) throws Exception {

 

        Properties properties = new Properties();

        properties.put("group.id", "Flink");

        properties.put("bootstrap.servers", "broker:9092");

 

 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(

                "test",

                new SimpleStringSchema(),

                properties);

 

        DataStream<String> stream = env

                .addSource(consumer)

                .map((MapFunction<String, String>) s -> {

                    // Just getting an object model

                    return model.toString();

                }).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))

                        .withTimestampAssigner((event, timestamp) -> {

                            Model model = new Gson().fromJson(event, Model.class);

                            return model.getServerTime();

                        }));

 

        stream.print("Stream");

 

 

 

        Pattern<String, String> firstPattern = Pattern

                .<String>begin("first")

                .where(new IterativeCondition<String>() {

                    @Override

                    public boolean filter(String s, Context<String> context) throws Exception {

                        return s.contains("Start");

                    }

                });

 

        DataStream<String> result = CEP

                .pattern(stream, firstPattern)

                .inEventTime() // default TimeCharacteristic for 1.12

                .process(new PatternProcessFunction<String, String>() {

                    @Override

                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {

                        collector.collect(map.get("first").get(0));

                    }

                });

 

        result.print("Result");

 

        env.execute();

    }

 

}

 
Please, help me to correct the code )
 
Thanks,Yuri L.

 
 
 
Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Dawid Wysakowicz-2

Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
Hello,
 
I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html, and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 

public class Main {

 

    public static void main(String[] args) throws Exception {

 

        Properties properties = new Properties();

        properties.put("group.id", "Flink");

        properties.put("bootstrap.servers", "broker:9092");

 

 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(

                "test",

                new SimpleStringSchema(),

                properties);

 

        DataStream<String> stream = env

                .addSource(consumer)

                .map((MapFunction<String, String>) s -> {

                    // Just getting an object model

                    return model.toString();

                }).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))

                        .withTimestampAssigner((event, timestamp) -> {

                            Model model = new Gson().fromJson(event, Model.class);

                            return model.getServerTime();

                        }));

 

        stream.print("Stream");

 

 

 

        Pattern<String, String> firstPattern = Pattern

                .<String>begin("first")

                .where(new IterativeCondition<String>() {

                    @Override

                    public boolean filter(String s, Context<String> context) throws Exception {

                        return s.contains("Start");

                    }

                });

 

        DataStream<String> result = CEP

                .pattern(stream, firstPattern)

                .inEventTime() // default TimeCharacteristic for 1.12

                .process(new PatternProcessFunction<String, String>() {

                    @Override

                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {

                        collector.collect(map.get("first").get(0));

                    }

                });

 

        result.print("Result");

 

        env.execute();

    }

 

}

 
Please, help me to correct the code )
 
Thanks,Yuri L.

 
 
 

OpenPGP_signature (855 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re[2]: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Maminspapin
David,
 
Thank you again for a reply. It really looks like this situation is happened because of the parallel instances.
 
Best,
Yuri L.
 
Пятница, 26 февраля 2021, 15:40 +03:00 от Dawid Wysakowicz <[hidden email]>:
 

Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
Hello,
 
I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html, and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 

public class Main {

 

    public static void main(String[] args) throws Exception {

 

        Properties properties = new Properties();

        properties.put("group.id", "Flink");

        properties.put("bootstrap.servers", "broker:9092");

 

 

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(

                "test",

                new SimpleStringSchema(),

                properties);

 

        DataStream<String> stream = env

                .addSource(consumer)

                .map((MapFunction<String, String>) s -> {

                    // Just getting an object model

                    return model.toString();

                }).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))

                        .withTimestampAssigner((event, timestamp) -> {

                            Model model = new Gson().fromJson(event, Model.class);

                            return model.getServerTime();

                        }));

 

        stream.print("Stream");

 

 

 

        Pattern<String, String> firstPattern = Pattern

                .<String>begin("first")

                .where(new IterativeCondition<String>() {

                    @Override

                    public boolean filter(String s, Context<String> context) throws Exception {

                        return s.contains("Start");

                    }

                });

 

        DataStream<String> result = CEP

                .pattern(stream, firstPattern)

                .inEventTime() // default TimeCharacteristic for 1.12

                .process(new PatternProcessFunction<String, String>() {

                    @Override

                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {

                        collector.collect(map.get("first").get(0));

                    }

                });

 

        result.print("Result");

 

        env.execute();

    }

 

}

 
Please, help me to correct the code )
 
Thanks,Yuri L.

 
 
 
 
 
Люльченко Юрий