I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), but encountered an Exception:
Caused by: java.lang.IllegalArgumentException: OutputTag must not be null.
code snippet implemented by Scala
```
private final val backupOutputTag = OutputTag[String]("backup")
val result = dataStream.assignAscendingTimestamps(_._3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum(1)
.process(new ProcessFunction[(String, Int, Long), (String, Int, Long)] {
override def processElement(value: (String, Int, Long), ctx: ProcessFunction[(String, Int, Long), (String, Int, Long)]#Context, out: Collector[(String, Int, Long)]): Unit = {
out.collect(value)
ctx.output(backupOutputTag, s"backup:${value}")
}
})
```
In my opinion, the reason is bcz `backupOutputTag` was created on JobManager, and `ctx.output(backupOutputTag)` was on TaskManager, so the `backupOutputTag` would be null.
But the doc example shows that way is ok, what's the correct usage in Scala?
flink-version:1.9.1
I post a question here but no resp:http://apache-flink.147419.n8.nabble.com/sideoutput-sql-state-td1533.html
Hope resp.Thanks.
1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html