|
Hi Guowei,
Thanks for pointing that out! It helps me resolve the issue.
Just a small correction: `static` identifier is not available in Scala. Its Scala alternative is `object`.
```scala object BaseJob { final val LOG = LoggerFactory.getLogger(getClass) }
```
Then referencing the LOG object whenever I want to log. This solves my problem.
Thank you!
Best, Yik San Hi, Yik San
You need to change the following line:
>>>> protected final val LOG = LoggerFactory.getLogger(getClass) protected static final val LOG = LoggerFactory.getLogger(getClass)
Hi community,
The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined.
BaseJob.scala ```scala import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.functions.sink.DiscardingSink import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.scala._ import org.slf4j.LoggerFactory
trait BaseJob { protected final val LOG = LoggerFactory.getLogger(getClass)
def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]
def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = { stream .map { a => // This LOG line throws Task not serializable // Commenting out the LOG line, then the LOG line in Job.scala works just fine LOG.info("[BaseJob] a = " + a) a } .addSink(new DiscardingSink[AnyRef]) } }
```
Job.scala ```scala import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Job extends BaseJob {
private val CreateSource = """ |CREATE TABLE source ( | a int |) WITH ( | 'connector' = 'datagen', | 'rows-per-second' = '5' |) |""".stripMargin
private def run(): JobExecutionResult = { val settings = EnvironmentSettings.newInstance.build val execEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(execEnv, settings) val stream = preprocess(tableEnv) process(stream) execEnv.execute("Streaming") }
override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = { tableEnv.executeSql(CreateSource) val table = tableEnv.sqlQuery("SELECT a FROM source") tableEnv .toDataStream(table) .map {row => val a = row.getField("a") // This LOG line works just fine! LOG.info("[Job] a = " + a) a } }
def main(args: Array[String]): Unit = { run() } } ```
It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that:
``` Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914) at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189) at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623) at BaseJob$class.process(BaseJob.scala:15) at Job$.process(Job.scala:7) at Job$.run(Job.scala:25) at Job$.main(Job.scala:42) at Job.main(Job.scala) Caused by: java.io.NotSerializableException: Job$ at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193) at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579) at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536) at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444) at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187) at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406) ... 10 more ```
I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks!
Best, Yik San
|