Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink 1.9.1) What I have is: messageStream. .keyBy(tradeKeySelector) .window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor())) .sideOutputLateData(lateTradeMessages) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5 secodns. Further I have: messageStream.getSideOutput(lateTradeMessages) .keyBy(tradeKeySelector) .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() { @Override public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception { System.out.println("Process Late messages For Aggregation"); out.collect(new Transaction()); } }) .name("Process Late messages For Aggregation"); The problem is that I never see "Process Late messages For Aggregation" when Im sending Messages with same key. When Session Window passes and I "immediately" sent a new message for the same Key it triggerts new Session Window, without going into Late Side Output. Not sure What I'm doing wrong here. What I would like to achieve heve is to catch "late events" and try to reprocess them againts state that was builder for "on time" events for this Window or if its is impossible, report late events into special Sink. I will appreciate any help. However it seems I do not see have any late Events. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
After following suggestion from SO
I added few changes, so now I'm using Event Time Water marks are progressing, I've checked them in Flink's metrics. The Window operator is triggered but still I don't see any late outputs for this. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)); env.setParallelism(1); env.disableOperatorChaining(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); DataStream<RawMessage> rawBusinessTransaction = env .addSource(new FlinkKafkaConsumer<>("business", new JSONKeyValueDeserializationSchema(false), properties)) .map(new KafkaTransactionObjectMapOperator()) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() { @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(System.currentTimeMillis()); } @Override public long extractTimestamp(RawMessage element, long previousElementTimestamp) { return element.messageCreationTime; } }) .name("Kafka Transaction Raw Data Source."); messageStream .keyBy(tradeKeySelector) .window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor())) .sideOutputLateData(lateTradeMessages) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Kristoff, please check my SO comment and reply. It's not entirely clear to me why it's not working but I also don't quite understand your use case yet (data examples missing). Best, Arvid On Fri, Jan 3, 2020 at 1:03 PM KristoffSC <[hidden email]> wrote: After following suggestion from SO |
Hi,
thank you for your SO comment [1]. You are right. Sorry, I miss understand the "late message" concepts. In fact I was never sending "late events" that should match just ended window. Thank you for your comments and clarification. [1] https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |