How do i register a streaming table sink in 1.12?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How do i register a streaming table sink in 1.12?

Clay Teeter
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Till Rohrmann
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Clay Teeter
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Till Rohrmann
Hi Clay,

could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[hidden email]> wrote:
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Clay Teeter
Ok, this is about as simple as I can get. 
package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
(_, _) => {}, // I have a feeling that this is the lambda that can't serialize
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}

On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[hidden email]> wrote:
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Till Rohrmann
I am not 100% sure but maybe (_, _) => {} captures a reference to object TestSink which is not serializable. Maybe try to simply define a no op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().

Cheers,
Till

On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[hidden email]> wrote:
Ok, this is about as simple as I can get. 
package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
(_, _) => {}, // I have a feeling that this is the lambda that can't serialize
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}

On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[hidden email]> wrote:
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Clay Teeter
Yep, that was it!  thanks! And to complete the thread, this is the working revision. 

package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.sql.PreparedStatement
import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

// it is required that you explicitly create a new JDBCStatementBuilder
val statementBuilder: JdbcStatementBuilder[String] =
new JdbcStatementBuilder[String] {
override def accept(ps: PreparedStatement, t: String): Unit = {
ps.setString(1, t)
}
}


private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
statementBuilder,
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}



On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann <[hidden email]> wrote:
I am not 100% sure but maybe (_, _) => {} captures a reference to object TestSink which is not serializable. Maybe try to simply define a no op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().

Cheers,
Till

On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[hidden email]> wrote:
Ok, this is about as simple as I can get. 
package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
(_, _) => {}, // I have a feeling that this is the lambda that can't serialize
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}

On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[hidden email]> wrote:
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay

Reply | Threaded
Open this post in threaded view
|

Re: How do i register a streaming table sink in 1.12?

Till Rohrmann
Great to hear that it is now working and thanks for letting the community know :-)

On Wed, Feb 17, 2021 at 2:48 PM Clay Teeter <[hidden email]> wrote:
Yep, that was it!  thanks! And to complete the thread, this is the working revision. 

package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.sql.PreparedStatement
import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

// it is required that you explicitly create a new JDBCStatementBuilder
val statementBuilder: JdbcStatementBuilder[String] =
new JdbcStatementBuilder[String] {
override def accept(ps: PreparedStatement, t: String): Unit = {
ps.setString(1, t)
}
}


private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
statementBuilder,
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}



On Wed, Feb 17, 2021 at 2:24 PM Till Rohrmann <[hidden email]> wrote:
I am not 100% sure but maybe (_, _) => {} captures a reference to object TestSink which is not serializable. Maybe try to simply define a no op JdbcStatementBuilder and pass such an instance to JdbcSink.sink().

Cheers,
Till

On Wed, Feb 17, 2021 at 2:06 PM Clay Teeter <[hidden email]> wrote:
Ok, this is about as simple as I can get. 
package com.maalka.flink.sinks

import com.maalka.flink.models.{MaalkaDataRecord, SignableUpdate}
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.connector.jdbc.internal.{GenericJdbcSinkFunction, JdbcBatchingOutputFormat}
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import com.maalka.flink.typeInformation.Implicits._

import java.util.function.Function

object TestSink extends LazyLogging {
// START HERE
def process(messageStream: DataStream[MaalkaDataRecord],
signableUpdateStream: Option[DataStream[SignableUpdate]],
streamExecutionEnvironment: StreamExecutionEnvironment): Unit = {

insertAnalyticData("raw",
"insert into analytic_table ... ",
messageStream.map(_ => "A"))
}

private def insertAnalyticData(
interval: String,
insertStatement: String,
messageStream: DataStream[String]): Unit = {
val connectionString = s"jdbc:postgresql://localhost/db"
val sink: SinkFunction[String] = JdbcSink.sink(
insertStatement,
(_, _) => {}, // I have a feeling that this is the lambda that can't serialize
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build,
JdbcOptions.builder()
.setDBUrl(connectionString)
.setTableName("analytic_table")
.build
)

messageStream
.addSink(sink)
}
}

On Wed, Feb 17, 2021 at 9:34 AM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

could you maybe share the source code of com.maalka.flink.sinks.MaalkaPostgresSink with us? It seems that this sink uses a lambda which is not serializable. Maybe it holds a reference to some non Serializable class as part of its closure.

Cheers,
Till

On Tue, Feb 16, 2021 at 8:58 PM Clay Teeter <[hidden email]> wrote:
Thanks Till, the tickets and links were immensely useful.  With that i was able to make progress and even get things to compile.  However, when i run things a serializable exception is thrown. (see below)

.addSink(JdbcSink.sink[SignableTableSchema](
addIntervalToInsertStatement(insertStatement, interval),
(ps: PreparedStatement, rd: SignableTableSchema) => {
ps.setString(1, rd.data_processing_id)
ps.setTimestamp(2, rd.crc)
ps.setString(3, rd.command)
ps.setString(4, rd.result)
ps.setOptionalString(5, rd.message)
ps.setString(6, rd.arguments)
ps.setOptionalString(7, rd.validatorUUID)
},
getJdbcExecutionOptions,
getJdbcOptions(interval, insertStatement) // <-- This is line 376
))
 Where i set the executionOptions to behave in a bachfull way.
def getJdbcExecutionOptions: JdbcExecutionOptions = {
JdbcExecutionOptions.builder()
.withBatchIntervalMs(1000)
.withBatchSize(1000)
.withMaxRetries(10)
.build
}

Any suggestions?

[info]   org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractJdbcOutputFormat is not serializable. The object probably contains or references non serializable fields.
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
[info]   at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
[info]   at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2000)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
[info]   at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
[info]   at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1110)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$.insertAnalyticData(MaalkaPostgresSink.scala:376)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process(MaalkaPostgresSink.scala:262)
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink.process$(MaalkaPostgresSink.scala:250)
[info]   ...
[info]   Cause: java.io.NotSerializableException: Non-serializable lambda
[info]   at com.maalka.flink.sinks.MaalkaPostgresSink$$$Lambda$22459/0x0000000809678c40.writeObject(Unknown Source)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1145)
[info]   at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1497)
[info]   at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
[info]   at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
[info]   at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)


On Tue, Feb 16, 2021 at 6:11 PM Till Rohrmann <[hidden email]> wrote:
Hi Clay,

I am not a Table API expert but let me try to answer your question:

With FLINK-17748 [1] the community removed the registerTableSink in favour of the connect API. The connect API has been deprecated [2] because it was not well maintained. Now the recommended way for specifying sinks is to use Flink's DDL [3]. Unfortunately, I couldn't find an easy example on how to use the DDL. Maybe Timo or Jark can point you towards a good guide on how to register your jdbc table sink.


Cheers,
Till

On Tue, Feb 16, 2021 at 4:42 PM Clay Teeter <[hidden email]> wrote:
Hey all.  Hopefully this is an easy question.  I'm porting my JDBC postgres sink from 1.10 to 1.12

I'm using: 
* StreamTableEnvironment
* JdbcUpsertTableSink

What I'm having difficulty with is how to register the sink with the streaming table environment. 

In 1.10: 

    tableEnv.registerTableSink(
      s"${interval}_maalka_jdbc_output_table",
      jdbcTableSink)

This method doesn't exist in 1.12, what is the equivalent?

Thanks!
Clay