SideOutput Exception: "Output Tag must not be null"

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

SideOutput Exception: "Output Tag must not be null"

Izual

I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), but encountered an Exception:

Caused by: java.lang.IllegalArgumentException: OutputTag must not be null.

code snippet implemented by Scala

```

  private final val backupOutputTag = OutputTag[String]("backup")

  val result = dataStream.assignAscendingTimestamps(_._3)

    .keyBy(0)

    .window(TumblingEventTimeWindows.of(Time.seconds(10)))

    .sum(1)

      .process(new ProcessFunction[(String, Int, Long), (String, Int, Long)] {

        override def processElement(value: (String, Int, Long), ctx: ProcessFunction[(String, Int, Long), (String, Int, Long)]#Context, out: Collector[(String, Int, Long)]): Unit = {

          out.collect(value)

          ctx.output(backupOutputTag, s"backup:${value}")

        }

      })

```

In my opinion, the reason is bcz `backupOutputTag` was created on JobManager, and `ctx.output(backupOutputTag)` was on TaskManager, so the `backupOutputTag` would be null.

But the doc example shows that way is ok, what's the correct usage in Scala?

flink-version:1.9.1

I post a question here but no resp:http://apache-flink.147419.n8.nabble.com/sideoutput-sql-state-td1533.html

Hope resp.Thanks.

1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html





 

Reply | Threaded
Open this post in threaded view
|

Re: SideOutput Exception: "Output Tag must not be null"

Arvid Heise-3
Hi Izual,

it seems as the code example is not complete. I'm assuming backupOutputTag is actually a field within your application class.

If you look at the examples, you will notice that backupOutputTag should be defined within the method that defines your topology and not on the wrapping object.
So drop the private modifier and move the definition inside the function.