Cheers,
We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore.
Issue: SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union, and cannot perform getSideOutput. Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs
According to documentation it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError: using scala flink version: 1.9
def processInt(data: DataStream[Int]): DataStream[Int] = { data.process(new ProcessFunction[Int, Int] { Best regards
|
Hi,
I'm afraid you stumbled across an inconsistency in the API. In the Java API we differentiate between DataStream and SingleOutputStreamOperator where the latter is used for "physical" operations that, among other things, allow things like getting side outputs. The Scala API hides this difference but internally still must play by the same rules. In your case, you would have to get the side outputs individually from the two operations that you union and then union the result of that one. This is quite cumbersome but the resulting runtime graph will be okay. The union operation does not exist there since it is only a logical operation that affects how operators/tasks are wired together. We could try and fix this in the Scala API by trying to do what I suggested above underneath the covers in the getSideOutput() call. We would have to "unwrap" the union, apply getSideOutput() individually and then form the union of that and return it. Best, Aljoscha On 18.11.20 11:09, Efrat Abramovitz wrote: > Cheers, > > > We have stumbled upon an issue regarding union streams after they have a tagged side output, it seems we cannot extract side output anymore. > > > Issue: > > SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after union, and cannot perform getSideOutput. > > Specifically in Scala doc, there is no indication of the existing or importance of the type SingleOutputSteramOperator using side outputs > > > According to documentation<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html> it is unclear in what terms data streams keeps their side output, and the code below will fail with a surprising MatchError: > > using scala > > flink version: 1.9 > > > import org.apache.flink.streaming.api.scala._ > > import org.apache.flink.streaming.api.functions.ProcessFunction > import org.apache.flink.util.Collector > > def main(args: Array[String]): Unit = { > > val flink = StreamExecutionEnvironment.getExecutionEnvironment > > val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12))) > > val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13))) > > > dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) //this line fails > > > val _ = flink.execute(JOB_NAME) > } > > def processInt(data: DataStream[Int]): DataStream[Int] = { > > data.process(new ProcessFunction[Int, Int] { > override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { > if (value % 2 == 0 && value < 10) { > ctx.output(EVEN, value) > } else if (value % 1 == 0 && value < 10) { > ctx.output(ODD, value) > } > out.collect(value) > } > }) > } > } > > > Best regards > > > > |
Free forum by Nabble | Edit this page |