Task not serializable when logging in a trait method

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Task not serializable when logging in a trait method

Yik San Chan
Hi community,

I have a job that consumes data from a datagen source, tries to log something in `map` operators, and sinks the result to a DiscardingSink. The full example can be found in [the repo](https://github.com/YikSanChan/log-in-flink-operator).

The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined.

BaseJob.scala
```scala
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory

trait BaseJob {
protected final val LOG = LoggerFactory.getLogger(getClass)

def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]

def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
stream
.map { a =>
// This LOG line throws Task not serializable
// Commenting out the LOG line, then the LOG line in Job.scala works just fine
LOG.info("[BaseJob] a = " + a)
a
}
.addSink(new DiscardingSink[AnyRef])
}
}
```

Job.scala
```scala
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Job extends BaseJob {

private val CreateSource =
"""
|CREATE TABLE source (
| a int
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '5'
|)
|""".stripMargin

private def run(): JobExecutionResult = {
val settings = EnvironmentSettings.newInstance.build
val execEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(execEnv, settings)
val stream = preprocess(tableEnv)
process(stream)
execEnv.execute("Streaming")
}

override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = {
tableEnv.executeSql(CreateSource)
val table = tableEnv.sqlQuery("SELECT a FROM source")
tableEnv
.toDataStream(table)
.map {row =>
val a = row.getField("a")
// This LOG line works just fine!
LOG.info("[Job] a = " + a)
a
}
}

def main(args: Array[String]): Unit = {
run()
}
}
```

It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
at BaseJob$class.process(BaseJob.scala:15)
at Job$.process(Job.scala:7)
at Job$.run(Job.scala:25)
at Job$.main(Job.scala:42)
at Job.main(Job.scala)
Caused by: java.io.NotSerializableException: Job$
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 10 more
```

I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Task not serializable when logging in a trait method

Guowei Ma
Hi, Yik San

You need to change the following line:
>>>>  protected final val LOG = LoggerFactory.getLogger(getClass)
protected static final val LOG = LoggerFactory.getLogger(getClass)

Best,
Guowei


On Mon, May 24, 2021 at 2:41 PM Yik San Chan <[hidden email]> wrote:
Hi community,

I have a job that consumes data from a datagen source, tries to log something in `map` operators, and sinks the result to a DiscardingSink. The full example can be found in [the repo](https://github.com/YikSanChan/log-in-flink-operator).

The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined.

BaseJob.scala
```scala
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory

trait BaseJob {
protected final val LOG = LoggerFactory.getLogger(getClass)

def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]

def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
stream
.map { a =>
// This LOG line throws Task not serializable
// Commenting out the LOG line, then the LOG line in Job.scala works just fine
LOG.info("[BaseJob] a = " + a)
a
}
.addSink(new DiscardingSink[AnyRef])
}
}
```

Job.scala
```scala
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Job extends BaseJob {

private val CreateSource =
"""
|CREATE TABLE source (
| a int
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '5'
|)
|""".stripMargin

private def run(): JobExecutionResult = {
val settings = EnvironmentSettings.newInstance.build
val execEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(execEnv, settings)
val stream = preprocess(tableEnv)
process(stream)
execEnv.execute("Streaming")
}

override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = {
tableEnv.executeSql(CreateSource)
val table = tableEnv.sqlQuery("SELECT a FROM source")
tableEnv
.toDataStream(table)
.map {row =>
val a = row.getField("a")
// This LOG line works just fine!
LOG.info("[Job] a = " + a)
a
}
}

def main(args: Array[String]): Unit = {
run()
}
}
```

It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
at BaseJob$class.process(BaseJob.scala:15)
at Job$.process(Job.scala:7)
at Job$.run(Job.scala:25)
at Job$.main(Job.scala:42)
at Job.main(Job.scala)
Caused by: java.io.NotSerializableException: Job$
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 10 more
```

I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Task not serializable when logging in a trait method

Yik San Chan
Hi Guowei,

Thanks for pointing that out! It helps me resolve the issue.

Just a small correction: `static` identifier is not available in Scala. Its Scala alternative is `object`.

```scala
object BaseJob {
final val LOG = LoggerFactory.getLogger(getClass)
}
```

Then referencing the LOG object whenever I want to log. This solves my problem.

Thank you!

Best,
Yik San

On Mon, May 24, 2021 at 3:23 PM Guowei Ma <[hidden email]> wrote:
Hi, Yik San

You need to change the following line:
>>>>  protected final val LOG = LoggerFactory.getLogger(getClass)
protected static final val LOG = LoggerFactory.getLogger(getClass)

Best,
Guowei


On Mon, May 24, 2021 at 2:41 PM Yik San Chan <[hidden email]> wrote:
Hi community,

I have a job that consumes data from a datagen source, tries to log something in `map` operators, and sinks the result to a DiscardingSink. The full example can be found in [the repo](https://github.com/YikSanChan/log-in-flink-operator).

The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined.

BaseJob.scala
```scala
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory

trait BaseJob {
protected final val LOG = LoggerFactory.getLogger(getClass)

def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]

def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {
stream
.map { a =>
// This LOG line throws Task not serializable
// Commenting out the LOG line, then the LOG line in Job.scala works just fine
LOG.info("[BaseJob] a = " + a)
a
}
.addSink(new DiscardingSink[AnyRef])
}
}
```

Job.scala
```scala
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Job extends BaseJob {

private val CreateSource =
"""
|CREATE TABLE source (
| a int
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '5'
|)
|""".stripMargin

private def run(): JobExecutionResult = {
val settings = EnvironmentSettings.newInstance.build
val execEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(execEnv, settings)
val stream = preprocess(tableEnv)
process(stream)
execEnv.execute("Streaming")
}

override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = {
tableEnv.executeSql(CreateSource)
val table = tableEnv.sqlQuery("SELECT a FROM source")
tableEnv
.toDataStream(table)
.map {row =>
val a = row.getField("a")
// This LOG line works just fine!
LOG.info("[Job] a = " + a)
a
}
}

def main(args: Array[String]): Unit = {
run()
}
}
```

It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)
at BaseJob$class.process(BaseJob.scala:15)
at Job$.process(Job.scala:7)
at Job$.run(Job.scala:25)
at Job$.main(Job.scala:42)
at Job.main(Job.scala)
Caused by: java.io.NotSerializableException: Job$
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)
... 10 more
```

I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks!

Best,
Yik San