Union SingleOutputSteramOperator and getSideOutput doesn't work

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Union SingleOutputSteramOperator and getSideOutput doesn't work

Efrat Abramovitz

Cheers,


We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore. 


Issue: 

SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union,  and cannot perform getSideOutput.

Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs


According to documentation it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError:

using scala

flink version: 1.9


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
def main(args: Array[String]): Unit = {
  val flink = StreamExecutionEnvironment.getExecutionEnvironment

val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12)))

val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13)))


dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) //this line fails


val _ = flink.execute(JOB_NAME)
}
def processInt(data: DataStream[Int]): DataStream[Int] = {
    data.process(new ProcessFunction[Int, Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
if (value % 2 == 0 && value < 10) {
ctx.output(EVEN, value)
} else if (value % 1 == 0 && value < 10) {
ctx.output(ODD, value)
}
out.collect(value)
}
})
}
}

Best regards



Reply | Threaded
Open this post in threaded view
|

Re: Union SingleOutputSteramOperator and getSideOutput doesn't work

Aljoscha Krettek
Hi,

I'm afraid you stumbled across an inconsistency in the API. In the Java
API we differentiate between DataStream and SingleOutputStreamOperator
where the latter is used for "physical" operations that, among other
things, allow things like getting side outputs.

The Scala API hides this difference but internally still must play by
the same rules.

In your case, you would have to get the side outputs individually from
the two operations that you union and then union the result of that one.
This is quite cumbersome but the resulting runtime graph will be okay.
The union operation does not exist there since it is only a logical
operation that affects how operators/tasks are wired together.

We could try and fix this in the Scala API by trying to do what I
suggested above underneath the covers in the getSideOutput() call. We
would have to "unwrap" the union, apply getSideOutput() individually and
then form the union of that and return it.

Best,
Aljoscha

On 18.11.20 11:09, Efrat Abramovitz wrote:

> Cheers,
>
>
> We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore.
>
>
> Issue:
>
> SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union,  and cannot perform getSideOutput.
>
> Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs
>
>
> According to documentation<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError:
>
> using scala
>
> flink version: 1.9
>
>
> import org.apache.flink.streaming.api.scala._
>
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
>
> def main(args: Array[String]): Unit = {
>
>    val flink = StreamExecutionEnvironment.getExecutionEnvironment
>
>    val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12)))
>
>    val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13)))
>
>
>    dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) //this line fails
>
>
>    val _ = flink.execute(JOB_NAME)
> }
>
> def processInt(data: DataStream[Int]): DataStream[Int] = {
>
>      data.process(new ProcessFunction[Int, Int] {
>        override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
>          if (value % 2 == 0 && value < 10) {
>            ctx.output(EVEN, value)
>          } else if (value % 1 == 0 && value < 10) {
>            ctx.output(ODD, value)
>          }
>          out.collect(value)
>        }
>      })
>    }
> }
>
>
> Best regards
>
>
>
>