Hello, I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html, and it’s clean for me how PatternStream works with ProcessTime. But I need help again, I can’t write proper code to execute PatternStream with EventTime regime. I think the problem is how I assign the watermark strategy. My code is below, version of Flink is 1.12: public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties(); properties.put("group.id", "Flink"); properties.put("bootstrap.servers", "broker:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "test", new SimpleStringSchema(), properties);
DataStream<String> stream = env .addSource(consumer) .map((MapFunction<String, String>) s -> { // Just getting an object model return model.toString(); }).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner((event, timestamp) -> { Model model = new Gson().fromJson(event, Model.class); return model.getServerTime(); }));
stream.print("Stream");
Pattern<String, String> firstPattern = Pattern .<String>begin("first") .where(new IterativeCondition<String>() { @Override public boolean filter(String s, Context<String> context) throws Exception { return s.contains("Start"); } });
DataStream<String> result = CEP .pattern(stream, firstPattern) .inEventTime() // default TimeCharacteristic for 1.12 .process(new PatternProcessFunction<String, String>() { @Override public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception { collector.collect(map.get("first").get(0)); } });
result.print("Result");
env.execute(); }
} Please, help me to correct the code ) Thanks,Yuri L. Ответить Переслать Предложить звонок Создать событие ПринятоХорошоВсе понятно, спасибо за информацию |
Hi, What is exactly the problem? Is it that no patterns are being generated? Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2]. Best, Dawid On 26/02/2021 13:21, Люльченко Юрий
Николаевич wrote:
OpenPGP_signature (855 bytes) Download Attachment |
David, Thank you again for a reply. It really looks like this situation is happened because of the parallel instances. Best, Yuri L. Пятница, 26 февраля 2021, 15:40 +03:00 от Dawid Wysakowicz <[hidden email]>: Люльченко Юрий |
Free forum by Nabble | Edit this page |