|
Great to hear that it is now working and thanks for letting the community know :-) Yep, that was it! thanks! And to complete the thread, this is the working revision.
package com.maalka.flink.sinks
import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat} import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor import org.apache.flink.connector.jdbc.internal.options.JdbcOptions import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala._ import com.maalka.flink.typeInformation.Implicits._
import java.sql.PreparedStatement import java.util.function.Function
object TestSink extends LazyLogging { // START HERE def process(messageStream: DataStream[MaalkaDataRecord], signableUpdateStream: Option[DataStream[SignableUpdate]], streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {
insertAnalyticData("raw", "insert into analytic_table ... ", messageStream.map(_ => "A")) }
// it is required that you explicitly create a new JDBCStatementBuilder val statementBuilder: JdbcStatementBuilder[String] = new JdbcStatementBuilder[String] { override def accept(ps: PreparedStatement, t: String): Unit = { ps.setString(1, t) } }
private def insertAnalyticData( interval: String, insertStatement: String, messageStream: DataStream[String]): Unit = { val connectionString = s"jdbc:postgresql://localhost/db" val sink: SinkFunction[String] = JdbcSink.sink( insertStatement, statementBuilder, JdbcExecutionOptions.builder() .withBatchIntervalMs(1000) .withBatchSize(1000) .withMaxRetries(10) .build, JdbcOptions.builder() .setDBUrl(connectionString) .setTableName("analytic_table") .build )
messageStream .addSink(sink) } }
I am not 100% sure but maybe (_, _) => {} captures a reference to object TestSink which is not serializable. Maybe try to simply define a no op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().
Cheers, Till
Ok, this is about as simple as I can get. package com.maalka.flink.sinks
import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate} import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat} import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor import org.apache.flink.connector.jdbc.internal.options.JdbcOptions import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala._ import com.maalka.flink.typeInformation.Implicits._
import java.util.function.Function
object TestSink extends LazyLogging { // START HERE def process(messageStream: DataStream[MaalkaDataRecord], signableUpdateStream: Option[DataStream[SignableUpdate]], streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {
insertAnalyticData("raw", "insert into analytic_table ... ", messageStream.map(_ => "A")) }
private def insertAnalyticData( interval: String, insertStatement: String, messageStream: DataStream[String]): Unit = { val connectionString = s"jdbc:postgresql://localhost/db" val sink: SinkFunction[String] = JdbcSink.sink( insertStatement, (_, _) => {}, // I have a feeling that this is the lambda that can't serialize JdbcExecutionOptions.builder() .withBatchIntervalMs(1000) .withBatchSize(1000) .withMaxRetries(10) .build, JdbcOptions.builder() .setDBUrl(connectionString) .setTableName("analytic_table") .build )
messageStream .addSink(sink) } }
Hi Clay,
could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.
Cheers, Till
Thanks Till, the tickets and links were immensely useful. With that i was able to make progress and even get things to compile. However, when i run things a serializable exception is thrown. (see below) .addSink(JdbcSink.sink[SignableTableSchema]( addIntervalToInsertStatement(insertStatement, interval), (ps: PreparedStatement, rd: SignableTableSchema) => { ps.setString(1, rd.data_processing_id) ps.setTimestamp(2, rd.crc) ps.setString(3, rd.command) ps.setString(4, rd.result) ps.setOptionalString(5, rd.message) ps.setString(6, rd.arguments) ps.setOptionalString(7, rd.validatorUUID) }, getJdbcExecutionOptions, getJdbcOptions(interval, insertStatement) // <-- This is line 376 ))
Where i set the executionOptions to behave in a bachfull way. def getJdbcExecutionOptions: JdbcExecutionOptions = { JdbcExecutionOptions.builder() .withBatchIntervalMs(1000) .withBatchSize(1000) .withMaxRetries(10) .build }
Any suggestions?
[info] org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields. [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) [info] at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) [info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000) [info] at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203) [info] at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243) [info] at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110) [info] at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376) [info] at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262) [info] at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250) [info] ... [info] Cause: java.io.NotSerializableException: Non-serializable lambda [info] at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source) [info] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [info] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [info] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [info] at java.base/java.lang.reflect.Method.invoke(Method.java:566) [info] at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145) [info] at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497) [info] at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433) [info] at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179) [info] at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
Hi Clay,
I am not a Table API expert but let me try to answer your question:
With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.
Cheers, Till
Hey all. Hopefully this is an easy question. I'm porting my JDBC postgres sink from 1.10 to 1.12
I'm using: * StreamTableEnvironment * JdbcUpsertTableSink
What I'm having difficulty with is how to register the sink with the streaming table environment.
In 1.10:
tableEnv.registerTableSink( s"${interval}_maalka_jdbc_output_table", jdbcTableSink)
This method doesn't exist in 1.12, what is the equivalent?
Thanks! Clay
|