sideOutputLateData not propagating late reports once window expires

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

sideOutputLateData not propagating late reports once window expires

cslotterback

Hey Flink Users,

 

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

 

val lateTag = new OutputTag[…]("tag"){}

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)
  .aggregate(
new VSGBondingGroupMediansAggregator(packetFilterCount))

 

val lateStream =
  windowedStream.getSideOutput(lateTag)
;

 

 

trigger:

public class myTrigger extends Trigger<…>, Window> {
   
@Override
   
public TriggerResult onElement(…) throws Exception {
       
return TriggerResult.CONTINUE;
   
}

   
@Override
   
public TriggerResult onProcessingTime() throws Exception {
       
throw new Exception("processing time not supported");
   
}

   
@Override
   
public TriggerResult onEventTime() throws Exception {
        
return TriggerResult.FIRE_AND_PURGE;
   
}

 

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

 

Chris

Reply | Threaded
Open this post in threaded view
|

Re: sideOutputLateData not propagating late reports once window expires

Chesnay Schepler
Please try this:

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)

 

val lateStream =
  windowedStream.getSideOutput(lateTag);

val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

Hey Flink Users,

 

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

 

val lateTag = new OutputTag[…]("tag"){}

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)
  .aggregate(
new VSGBondingGroupMediansAggregator(packetFilterCount))

 

val lateStream =
  windowedStream.getSideOutput(lateTag);

 

 

trigger:

public class myTrigger extends Trigger<…>, Window> {
    @Override
    public TriggerResult onElement(…) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime() throws Exception {
        throw new Exception("processing time not supported");
    }

    @Override
    public TriggerResult onEventTime() throws Exception { 
        return TriggerResult.FIRE_AND_PURGE;
    }

 

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

 

Chris


Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

cslotterback

Hi Chesnay,

 

That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation.

 

Chris

 

From: Chesnay Schepler <[hidden email]>
Date: Tuesday, May 11, 2021 at 6:09 AM
To: "Slotterback, Chris" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

 

Please try this:

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)

 

val lateStream =
  windowedStream.getSideOutput(lateTag);
 
val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

 

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

Hey Flink Users,

 

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

 

val lateTag = new OutputTag[…]("tag"){}

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)
  .aggregate(
new VSGBondingGroupMediansAggregator(packetFilterCount))

 

val lateStream =
  windowedStream.getSideOutput(lateTag);

 

 

trigger:

public class myTrigger extends Trigger<…>, Window> {
    @Override
    public TriggerResult onElement(…) throws Exception {
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public TriggerResult onProcessingTime() throws Exception {
        throw new Exception("processing time not supported");
    }
 
    @Override
    public TriggerResult onEventTime() throws Exception { 
        return TriggerResult.FIRE_AND_PURGE;
    }

 

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

 

Chris

 

Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

Chesnay Schepler
Ah, sorry for the compile issue.

I wasn't able to reproduce the issue; conceptually your code looks fine.

Can you provide us with a self-contained reproducer for the issue?

For reference, here's the test I used, that you can maybe adjust as necessary to replicate your use-case:

@Test
public void testWindowLateDataSideOutput() throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    final OutputTag<Integer> lateTag = new OutputTag<>("tag") {};

    final SingleOutputStreamOperator<Integer> windowedStream =
            env.addSource(new MySource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
                    .keyBy(x -> x)
                    .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                    .allowedLateness(Time.of(1, TimeUnit.SECONDS))
                    .sideOutputLateData(lateTag)
                    .trigger(new MyTrigger())
                    .reduce(Integer::sum);

    final DataStream<Integer> lateStream = windowedStream.getSideOutput(lateTag);
    lateStream.map(x -> "late: " + x).print();

    env.execute();
}

private static class MySource implements SourceFunction<Integer> {
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        final long lateTime = 1000;
        long currentTime = 5000;

        while (true) {
            ctx.collectWithTimestamp(1, lateTime);
            ctx.collectWithTimestamp(2, currentTime);
            currentTime += 100;
        }
    }

    @Override
    public void cancel() {}
}

On 5/11/2021 6:42 PM, Slotterback, Chris wrote:

Hi Chesnay,

 

That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation.

 

Chris

 

From: Chesnay Schepler [hidden email]
Date: Tuesday, May 11, 2021 at 6:09 AM
To: "Slotterback, Chris" [hidden email], [hidden email] [hidden email]
Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

 

Please try this:

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)

 

val lateStream =
  windowedStream.getSideOutput(lateTag);
 
val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

 

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

Hey Flink Users,

 

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

 

val lateTag = new OutputTag[…]("tag"){}

 

val windowedStream = stream
  .keyBy(
)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(
new myTrigger)
  .aggregate(
new VSGBondingGroupMediansAggregator(packetFilterCount))

 

val lateStream =
  windowedStream.getSideOutput(lateTag);

 

 

trigger:

public class myTrigger extends Trigger<…>, Window> {
    @Override
    public TriggerResult onElement(…) throws Exception {
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public TriggerResult onProcessingTime() throws Exception {
        throw new Exception("processing time not supported");
    }
 
    @Override
    public TriggerResult onEventTime() throws Exception { 
        return TriggerResult.FIRE_AND_PURGE;
    }

 

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

 

Chris

 


Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

cslotterback

Chesnay,

 

Thanks for the help, I used your example as a baseline for mine and got it working. For anyone who may see this in the future, I actually had an assignTimestampsAndWatermarks (along with a name and uid operator) attached to the end of the stream I was calling getSideOutput on. It was my false assumption that those 3 operators had no effect on accessing side outputs, as the watermark assigner seems to modify the chain and prevents accessing the late side outputs after it is appended. I moved the watermark further down the chain after accessing the side output and it is working as expected now.

 

Thanks for the help!

Chris

 

From: Chesnay Schepler <[hidden email]>
Date: Wednesday, May 12, 2021 at 5:24 AM
To: "Slotterback, Chris" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

 

Ah, sorry for the compile issue.

 

I wasn't able to reproduce the issue; conceptually your code looks fine.

 

Can you provide us with a self-contained reproducer for the issue?

 

For reference, here's the test I used, that you can maybe adjust as necessary to replicate your use-case:

 

@Test
public void testWindowLateDataSideOutput() throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    final OutputTag<Integer> lateTag = new OutputTag<>("tag") {};
 
    final SingleOutputStreamOperator<Integer> windowedStream =
            env.addSource(new MySource())
                    .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
                    .keyBy(x -> x)
                    .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
                    .allowedLateness(Time.of(1, TimeUnit.SECONDS))
                    .sideOutputLateData(lateTag)
                    .trigger(new MyTrigger())
                    .reduce(Integer::sum);
 
    final DataStream<Integer> lateStream = windowedStream.getSideOutput(lateTag);
    lateStream.map(x -> "late: " + x).print();
 
    env.execute();
}
 
private static class MySource implements SourceFunction<Integer> {
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        final long lateTime = 1000;
        long currentTime = 5000;
 
        while (true) {
            ctx.collectWithTimestamp(1, lateTime);
            ctx.collectWithTimestamp(2, currentTime);
            currentTime += 100;
        }
    }
 
    @Override
    public void cancel() {}
}

 

On 5/11/2021 6:42 PM, Slotterback, Chris wrote:

Hi Chesnay,

 

That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation.

 

Chris

 

From: Chesnay Schepler [hidden email]
Date: Tuesday, May 11, 2021 at 6:09 AM
To: "Slotterback, Chris" [hidden email], [hidden email] [hidden email]
Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

 

Please try this:

 

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)

 

val lateStream =
  windowedStream.getSideOutput(lateTag);
 
val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

 

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

Hey Flink Users,

 

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

 

val lateTag = new OutputTag[…]("tag"){}

 

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)
  .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

 

val lateStream =
  windowedStream.getSideOutput(lateTag);

 

 

trigger:

public class myTrigger extends Trigger<…>, Window> {
    @Override
    public TriggerResult onElement(…) throws Exception {
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public TriggerResult onProcessingTime() throws Exception {
        throw new Exception("processing time not supported");
    }
 
    @Override
    public TriggerResult onEventTime() throws Exception { 
        return TriggerResult.FIRE_AND_PURGE;
    }

 

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

 

Chris