Cheers,
We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore.
Issue:
SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union, and cannot perform getSideOutput.
Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs
According to documentation it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError:
using scala
flink version: 1.9
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collectordef main(args: Array[String]): Unit = {val flink = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12)))
val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13)))
dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) //this line fails
val _ = flink.execute(JOB_NAME)
}
def processInt(data: DataStream[Int]): DataStream[Int] = {
data.process(new ProcessFunction[Int, Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
if (value % 2 == 0 && value < 10) {
ctx.output(EVEN, value)
} else if (value % 1 == 0 && value < 10) {
ctx.output(ODD, value)
}
out.collect(value)
}
})
}
}
Best regards
Free forum by Nabble | Edit this page |