Re: Union SingleOutputSteramOperator and getSideOutput doesn't work

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Union-SingleOutputSteramOperator-and-getSideOutput-doesn-t-work-tp39571p39578.html

Hi,

I'm afraid you stumbled across an inconsistency in the API. In the Java
API we differentiate between DataStream and SingleOutputStreamOperator
where the latter is used for "physical" operations that, among other
things, allow things like getting side outputs.

The Scala API hides this difference but internally still must play by
the same rules.

In your case, you would have to get the side outputs individually from
the two operations that you union and then union the result of that one.
This is quite cumbersome but the resulting runtime graph will be okay.
The union operation does not exist there since it is only a logical
operation that affects how operators/tasks are wired together.

We could try and fix this in the Scala API by trying to do what I
suggested above underneath the covers in the getSideOutput() call. We
would have to "unwrap" the union, apply getSideOutput() individually and
then form the union of that and return it.

Best,
Aljoscha

On 18.11.20 11:09, Efrat Abramovitz wrote:

> Cheers,
>
>
> We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore.
>
>
> Issue:
>
> SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union,  and cannot perform getSideOutput.
>
> Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs
>
>
> According to documentation<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError:
>
> using scala
>
> flink version: 1.9
>
>
> import org.apache.flink.streaming.api.scala._
>
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.util.Collector
>
> def main(args: Array[String]): Unit = {
>
>    val flink = StreamExecutionEnvironment.getExecutionEnvironment
>
>    val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12)))
>
>    val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13)))
>
>
>    dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) //this line fails
>
>
>    val _ = flink.execute(JOB_NAME)
> }
>
> def processInt(data: DataStream[Int]): DataStream[Int] = {
>
>      data.process(new ProcessFunction[Int, Int] {
>        override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
>          if (value % 2 == 0 && value < 10) {
>            ctx.output(EVEN, value)
>          } else if (value % 1 == 0 && value < 10) {
>            ctx.output(ODD, value)
>          }
>          out.collect(value)
>        }
>      })
>    }
> }
>
>
> Best regards
>
>
>
>