sideOutputLateData doesn't work with map()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

sideOutputLateData doesn't work with map()

orips
Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] {
    override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers and then uses process(…) to just collect them into a list. There's also side output for late data.
When I run this job I can see printing to stdout of the late messages without any problem.

However, when I add a map(…) after process(…), the late data isn't getting into the sideoutput and I cannot see the printing to stdout:
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)

Is this a bug or is it working as intended? If it's not a bug - does it mean I cannot add any operator after process(…)?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: sideOutputLateData doesn't work with map()

orips

Turns out that this is the way to solve this problem:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] {
    override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()
stream
  .map(list => list :+ 42)
  .print()

senv.execute()

On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski <[hidden email]> wrote:
Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] {
    override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers and then uses process(…) to just collect them into a list. There's also side output for late data.
When I run this job I can see printing to stdout of the late messages without any problem.

However, when I add a map(…) after process(…), the late data isn't getting into the sideoutput and I cannot see the printing to stdout:
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)

Is this a bug or is it working as intended? If it's not a bug - does it mean I cannot add any operator after process(…)?

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: sideOutputLateData doesn't work with map()

Chesnay Schepler
This is working as intended, but is admittedly inconvenient.
The reason why the original version does not work is that the side-output is scoped to the DataStream that the process function creates; the Map function creates another DataStream though that does not retain the side-output of the previous DataStream.

On 9/17/2020 3:21 PM, Ori Popowski wrote:

Turns out that this is the way to solve this problem:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] {
    override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()
stream
  .map(list => list :+ 42)
  .print()

senv.execute()

On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski <[hidden email]> wrote:
Hi,

I have this simple flow:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] {
    override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .print()
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()

senv.execute()

This is a simple stream which uses a session window on integers and then uses process(…) to just collect them into a list. There's also side output for late data.
When I run this job I can see printing to stdout of the late messages without any problem.

However, when I add a map(…) after process(…), the late data isn't getting into the sideoutput and I cannot see the printing to stdout:
.sideOutputLateData(tag)
.process(…)
.map(list => list :+ 42)

Is this a bug or is it working as intended? If it's not a bug - does it mean I cannot add any operator after process(…)?

Thanks