Anomaly in handling late arriving data

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Anomaly in handling late arriving data

Indraneel R
Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the allowed lateness shown below:

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(2)


    val source: DataStream[Event] = env.addSource(new SourceFunction[Event] {
      lazy val input: Seq[Event] = Seq(
        Event("u1", "e1", 1L),
        Event("u1", "e5", 6L),
        Event("u1", "e7", 11L),
        Event("u1", "e8", 12L),
        Event("u1", "e9", 16L),
        Event("u1", "e11", 14L),
        Event("u1", "e12", 8L),
        Event("u1", "e13", 20L),
      )

      override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
        {
          input.foreach(event => {
            ctx.collectWithTimestamp(event, event.timestamp)
            ctx.emitWatermark(new Watermark(event.timestamp - 1))
          })
          ctx.emitWatermark(new Watermark(Long.MaxValue))
        }
      }

      override def cancel(): Unit = {}
    })

    val tag: OutputTag[Event] = OutputTag("late-data")

    val sessionizedStream: DataStream[Event] = source
      .keyBy(item => item.userId)
      .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
      .sideOutputLateData(tag)
      .allowedLateness(Time.milliseconds(2L))
      .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

        override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[Event]): Unit = {
          val sessionIdForWindow = key + "-" + context.currentWatermark + "-" + context.window.getStart

          elements.toSeq
            .sortBy(event => event.timestamp)
            .foreach(event => {
              out.collect(event.copy(sessionId = sessionIdForWindow, count = elements.size))
            })
        }
      })

    sessionizedStream.getSideOutput(tag).print()
    env.execute()
  }

But heres the problem. I am expecting the event highlighted in red above(e12) , to be collected in the side output as a late event. 
But it isn't. The event is not printed.
Whats interesting is, if I make any one of the following changes, the event e12 is considered late and is printed.
       1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)
       2) allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(1L)) 
              3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
            allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(4L))   // or anything less than 7L 

Can someone explain whats going on? What am I missing here?

regards
-Indraneel
Reply | Threaded
Open this post in threaded view
|

Re: Anomaly in handling late arriving data

Zhu Zhu
Hi Indraneel,

In your case, ("u1", "e12", 8L) is not considered late and will go into the session window {e7,e8,e9,e11} (range=11~19).
This is because 8+3(session gap) >= 11, the lower bound of the existing session window

Regarding your 3 questions:
>> 1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)
7+3 < 11, so e12 will not go into the session window {e7,e8,e9,e11}. 
And it will be fired for the lateness.

>> 2) allowedLateness(Time.milliseconds(2L)) change to allowedLateness(Time.milliseconds(1L)) 
Reduce the allowedLateness will cause window {e7,e8} to be fired when e9 arrives. 
So when e12 arrives, the existing session window is (e9,e11} (range=14~17).
e12 will be considered to be late in this case.

>> 3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND allowedLateness(Time.milliseconds(2L)) change to allowedLateness(Time.milliseconds(4L)) 
The same as case 1).

Thanks,
Zhu Zhu

Indraneel R <[hidden email]> 于2019年9月26日周四 上午2:24写道:
Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the allowed lateness shown below:

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(2)


    val source: DataStream[Event] = env.addSource(new SourceFunction[Event] {
      lazy val input: Seq[Event] = Seq(
        Event("u1", "e1", 1L),
        Event("u1", "e5", 6L),
        Event("u1", "e7", 11L),
        Event("u1", "e8", 12L),
        Event("u1", "e9", 16L),
        Event("u1", "e11", 14L),
        Event("u1", "e12", 8L),
        Event("u1", "e13", 20L),
      )

      override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
        {
          input.foreach(event => {
            ctx.collectWithTimestamp(event, event.timestamp)
            ctx.emitWatermark(new Watermark(event.timestamp - 1))
          })
          ctx.emitWatermark(new Watermark(Long.MaxValue))
        }
      }

      override def cancel(): Unit = {}
    })

    val tag: OutputTag[Event] = OutputTag("late-data")

    val sessionizedStream: DataStream[Event] = source
      .keyBy(item => item.userId)
      .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
      .sideOutputLateData(tag)
      .allowedLateness(Time.milliseconds(2L))
      .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

        override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[Event]): Unit = {
          val sessionIdForWindow = key + "-" + context.currentWatermark + "-" + context.window.getStart

          elements.toSeq
            .sortBy(event => event.timestamp)
            .foreach(event => {
              out.collect(event.copy(sessionId = sessionIdForWindow, count = elements.size))
            })
        }
      })

    sessionizedStream.getSideOutput(tag).print()
    env.execute()
  }

But heres the problem. I am expecting the event highlighted in red above(e12) , to be collected in the side output as a late event. 
But it isn't. The event is not printed.
Whats interesting is, if I make any one of the following changes, the event e12 is considered late and is printed.
       1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)
       2) allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(1L)) 
              3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
            allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(4L))   // or anything less than 7L 

Can someone explain whats going on? What am I missing here?

regards
-Indraneel