Late outputs for Session Window

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Late outputs for Session Window

KristoffSC
Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink
1.9.1)

What I have is:


messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5
secodns.

Further I have:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx,
Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For
Aggregation");
                   out.collect(new Transaction());
                }
       })
   .name("Process Late messages For Aggregation");


The problem is that I never see "Process Late messages For Aggregation" when
Im sending Messages with same key.

When Session Window passes and I "immediately" sent a new message for the
same Key it triggerts new Session Window, without going into Late Side
Output.

Not sure What I'm doing wrong here.

What I would like to achieve heve is to catch "late events" and try to
reprocess them againts state that was builder for "on time" events for this
Window or if its is impossible, report late events into special Sink.

I will appreciate any help.
However it seems I do not see have any late Events.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Late outputs for Session Window

KristoffSC
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this.


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false),
properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Late outputs for Session Window

Arvid Heise-3
Hi Kristoff,

please check my SO comment and reply.


It's not entirely clear to me why it's not working but I also don't quite understand your use case yet (data examples missing).

Best,

Arvid

On Fri, Jan 3, 2020 at 1:03 PM KristoffSC <[hidden email]> wrote:
After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this.


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
        env.setParallelism(1);
        env.disableOperatorChaining();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);


DataStream<RawMessage> rawBusinessTransaction = env
                .addSource(new FlinkKafkaConsumer<>("business",
                        new JSONKeyValueDeserializationSchema(false),
properties))
                .map(new KafkaTransactionObjectMapOperator())
                .assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<RawMessage>() {

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(System.currentTimeMillis());
                    }

                    @Override
                    public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
                        return element.messageCreationTime;
                    }
                })
                .name("Kafka Transaction Raw Data Source.");

messageStream
             .keyBy(tradeKeySelector)
             .window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
             .sideOutputLateData(lateTradeMessages)
             .process(new CumulativeTransactionOperator())
             .name("Aggregate Transaction Builder");






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Late outputs for Session Window

KristoffSC
Hi,
thank you for your SO comment [1]. You are right. Sorry, I miss understand
the "late message" concepts.
In fact I was never sending "late events" that should match just ended
window.

Thank you for your comments and clarification.


[1]
https://stackoverflow.com/questions/59570445/late-outputs-missing-for-flinks-session-window/59642942#59642942



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/