Anomaly in handling late arriving data

Posted by Indraneel R on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Anomaly-in-handling-late-arriving-data-tp30218.html

Hi Everyone,

I am trying to execute this simple sessionization pipeline, with the allowed lateness shown below:

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(2)


    val source: DataStream[Event] = env.addSource(new SourceFunction[Event] {
      lazy val input: Seq[Event] = Seq(
        Event("u1", "e1", 1L),
        Event("u1", "e5", 6L),
        Event("u1", "e7", 11L),
        Event("u1", "e8", 12L),
        Event("u1", "e9", 16L),
        Event("u1", "e11", 14L),
        Event("u1", "e12", 8L),
        Event("u1", "e13", 20L),
      )

      override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
        {
          input.foreach(event => {
            ctx.collectWithTimestamp(event, event.timestamp)
            ctx.emitWatermark(new Watermark(event.timestamp - 1))
          })
          ctx.emitWatermark(new Watermark(Long.MaxValue))
        }
      }

      override def cancel(): Unit = {}
    })

    val tag: OutputTag[Event] = OutputTag("late-data")

    val sessionizedStream: DataStream[Event] = source
      .keyBy(item => item.userId)
      .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
      .sideOutputLateData(tag)
      .allowedLateness(Time.milliseconds(2L))
      .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] {

        override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[Event]): Unit = {
          val sessionIdForWindow = key + "-" + context.currentWatermark + "-" + context.window.getStart

          elements.toSeq
            .sortBy(event => event.timestamp)
            .foreach(event => {
              out.collect(event.copy(sessionId = sessionIdForWindow, count = elements.size))
            })
        }
      })

    sessionizedStream.getSideOutput(tag).print()
    env.execute()
  }

But heres the problem. I am expecting the event highlighted in red above(e12) , to be collected in the side output as a late event. 
But it isn't. The event is not printed.
Whats interesting is, if I make any one of the following changes, the event e12 is considered late and is printed.
       1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)
       2) allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(1L)) 
              3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
            allowedLateness(Time.milliseconds(2L))   change to allowedLateness(Time.milliseconds(4L))   // or anything less than 7L 

Can someone explain whats going on? What am I missing here?

regards
-Indraneel