Hi, I have this simple flow: val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tag = OutputTag[Tuple1[Int]]("late") val stream = senv .addSource(new SourceFunction[Int] { override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { (10000 to 10090).foreach(ctx.collect) Thread.sleep(1000) (20 to 30).foreach(ctx.collect) } override def cancel(): Unit = {} }) .map(x => Tuple1(x)) .assignAscendingTimestamps(_._1) .keyBy(_ => 1) .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000))) .sideOutputLateData(tag) .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = { out.collect(elements.map(_._1).toList) } }) stream .print() stream .getSideOutput(tag) .map(a => s"late: $a") .print() senv.execute() This is a simple stream which uses a session window on integers and then uses process(…) to just collect them into a list. There's also side output for late data. When I run this job I can see printing to stdout of the late messages without any problem. However, when I add a map(…) after process(…), the late data isn't getting into the sideoutput and I cannot see the printing to stdout: … .sideOutputLateData(tag) .process(…) .map(list => list :+ 42) … Is this a bug or is it working as intended? If it's not a bug - does it mean I cannot add any operator after process(…)? Thanks |
Turns out that this is the way to solve this problem: val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tag = OutputTag[Tuple1[Int]]("late") val stream = senv .addSource(new SourceFunction[Int] { override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { (10000 to 10090).foreach(ctx.collect) Thread.sleep(1000) (20 to 30).foreach(ctx.collect) } override def cancel(): Unit = {} }) .map(x => Tuple1(x)) .assignAscendingTimestamps(_._1) .keyBy(_ => 1) .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000))) .sideOutputLateData(tag) .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = { out.collect(elements.map(_._1).toList) } }) stream .getSideOutput(tag) .map(a => s"late: $a") .print() stream .map(list => list :+ 42) .print() senv.execute() On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski <[hidden email]> wrote:
|
This is working as intended, but is
admittedly inconvenient.
The reason why the original version
does not work is that the side-output is scoped to the DataStream
that the process function creates; the Map function creates
another DataStream though that does not retain the side-output of
the previous DataStream.
On 9/17/2020 3:21 PM, Ori Popowski
wrote:
|
Free forum by Nabble | Edit this page |