| 
					
	
	
	
	
				 | 
				
					
	
	
		Hi Guowei, 
 
 Thanks for pointing that out! It helps me resolve the issue. 
 
 Just a small correction: `static` identifier is not available in Scala. Its Scala alternative is `object`. 
 
 ```scala object BaseJob {   final val LOG = LoggerFactory.getLogger(getClass) }
 ``` 
 
 Then referencing the LOG object whenever I want to log. This solves my problem. 
 
 Thank you! 
 
 Best, Yik San Hi, Yik San 
 
 You need to change the following line:
  >>>>  protected final val LOG = LoggerFactory.getLogger(getClass) protected static final val LOG = LoggerFactory.getLogger(getClass)
  
 
 
 Hi community, 
 
 
 
 The `Job` extends `BaseJob` where `preprocess` and `process` methods are defined. 
 
 BaseJob.scala ```scala import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.functions.sink.DiscardingSink import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.api.scala._ import org.slf4j.LoggerFactory
  trait BaseJob {   protected final val LOG = LoggerFactory.getLogger(getClass)
    def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef]
    def process(stream: DataStream[AnyRef]): DataStreamSink[AnyRef] = {     stream       .map { a =>         // This LOG line throws Task not serializable         // Commenting out the LOG line, then the LOG line in Job.scala works just fine         LOG.info("[BaseJob] a = " + a)         a       }       .addSink(new DiscardingSink[AnyRef])   } }
  ``` 
 
 Job.scala ```scala import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
  object Job extends BaseJob {
    private val CreateSource =     """       |CREATE TABLE source (       |  a int       |) WITH (       |  'connector' = 'datagen',       |  'rows-per-second' = '5'       |)       |""".stripMargin
    private def run(): JobExecutionResult = {     val settings = EnvironmentSettings.newInstance.build     val execEnv: StreamExecutionEnvironment =       StreamExecutionEnvironment.getExecutionEnvironment     val tableEnv = StreamTableEnvironment.create(execEnv, settings)     val stream = preprocess(tableEnv)     process(stream)     execEnv.execute("Streaming")   }
    override def preprocess(tableEnv: StreamTableEnvironment): DataStream[AnyRef] = {     tableEnv.executeSql(CreateSource)     val table = tableEnv.sqlQuery("SELECT a FROM source")     tableEnv       .toDataStream(table)       .map {row =>         val a = row.getField("a")         // This LOG line works just fine!         LOG.info("[Job] a = " + a)         a       }   }
    def main(args: Array[String]): Unit = {     run()   } } ``` 
 
 It is very odd that, the LOG line in Job.scala works just fine, while the LOG line in BaseJob.scala complains that: 
 
 ``` Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)    at org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:914)    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:1189)    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:623)    at BaseJob$class.process(BaseJob.scala:15)    at Job$.process(Job.scala:7)    at Job$.run(Job.scala:25)    at Job$.main(Job.scala:42)    at Job.main(Job.scala) Caused by: java.io.NotSerializableException: Job$    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1193)    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1579)    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1536)    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1444)    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:353)    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:406)    ... 10 more ``` 
 
 I wonder why, and how to resolve this issue, as I do want to LOG in the BaseJob? Thanks! 
 
 Best, Yik San  
  
  
	
	
	
	 
				 |