[Stateful Functions] JDBC Sink Problems

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Stateful Functions] JDBC Sink Problems

Jan Brusch
Hello,

we are currently trying to implement a JDBC Sink in Stateful Functions
as documented here:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html

However, when starting the application we are running into this error:

--------------------------------------------------------------------

java.lang.IllegalStateException: objects can not be reused with JDBC
sink function at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
~[?:?]                                  at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
~[?:?]
         at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
~[flink-dist_2.12-1.11.3.ja
r:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.12-1.11.3
.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-02-04 13:59:49,121 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to
  restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
2021-02-04 13:59:49,122 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 8 tasks should be re
started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.

-------------------------------------------------------------------

We tested the same sink in a regular flink application under similar
circumstances (Protobuf Objects etc.) and it works just fine. As a
solution we have tried to set the parameter "pipeline.object-reuse" in
the flink-conf.yaml of the stateful functions application to true, but
that had no effect on the above error message. Stateful Functions
version is 2.2.2


Did anyone else see this problem before?


Relevant Application Code:

-------- MyMessageSink.java -----

public class MyMessageSink {
     public static final EgressIdentifier<MyMessage> SINK_ID =
             new EgressIdentifier<>("mynamespace", "MyMessageSink",
MyMessage.class);

     public EgressSpec<MyMessage> getEgressSpec() {
         JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withDriverName("org.postgresql.Driver")
.withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar")
                 .build();
         JdbcStatementBuilder<MyMessage> jdbcStatementBuilder =
(statementTemplate, myMessage) -> {
             statementTemplate.setString(1, myMessage.getFirstField());
             statementTemplate.setString(2,
accountSessionMessage.getSecondField());
         };
         SinkFunction<MyMessage> sinkFunction = JdbcSink.sink(
                 "INSERT INTO my_table (first_field, second_field)
VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;",
                 jdbcStatementBuilder,
                 jdbcConnectionOptions
         );
         return new SinkFunctionSpec<>(
                 SINK_ID,
                 sinkFunction
         );
     }

}

---------------------------------------


----------- Module.java ---------------

...

MyMessageSink myMessageSink = new MyMessageSink();
binder.bindEgress(myMessageSink.getEgressSpec());

...

----------------------------------------------


Best regards,

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply | Threaded
Open this post in threaded view
|

Re: [Stateful Functions] JDBC Sink Problems

Igal Shilman
Hi Jan,

StateFun enables object reuse automatically, and it can't be disabled with a configuration.
There is a technical reason for that that has to do with how we translate StateFun concepts to Flink concepts.
I've created an issue to remove this limitation [1].

I might come up with a workaround in the upcoming few days, and let you know, if you are ok with building StateFun from source?
Otherwise, we will try to address this in the next StateFun release.

While we are on the topic of the JdbcSink, as far as I know, it doesn't support exactly once.
If this is important to you, then I will suggest simply emitting the inserts to Kafka and periodically
bulk insert them to the database.

All the best,
Igal.


On Thu, Feb 4, 2021 at 3:13 PM Jan Brusch <[hidden email]> wrote:
Hello,

we are currently trying to implement a JDBC Sink in Stateful Functions
as documented here:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html

However, when starting the application we are running into this error:

--------------------------------------------------------------------

java.lang.IllegalStateException: objects can not be reused with JDBC
sink function at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
~[?:?]                                  at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
~[?:?]
         at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
~[flink-dist_2.12-1.11.3.ja
r:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.12-1.11.3
.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-02-04 13:59:49,121 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to
  restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
2021-02-04 13:59:49,122 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 8 tasks should be re
started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.

-------------------------------------------------------------------

We tested the same sink in a regular flink application under similar
circumstances (Protobuf Objects etc.) and it works just fine. As a
solution we have tried to set the parameter "pipeline.object-reuse" in
the flink-conf.yaml of the stateful functions application to true, but
that had no effect on the above error message. Stateful Functions
version is 2.2.2


Did anyone else see this problem before?


Relevant Application Code:

-------- MyMessageSink.java -----

public class MyMessageSink {
     public static final EgressIdentifier<MyMessage> SINK_ID =
             new EgressIdentifier<>("mynamespace", "MyMessageSink",
MyMessage.class);

     public EgressSpec<MyMessage> getEgressSpec() {
         JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withDriverName("org.postgresql.Driver")
.withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar")
                 .build();
         JdbcStatementBuilder<MyMessage> jdbcStatementBuilder =
(statementTemplate, myMessage) -> {
             statementTemplate.setString(1, myMessage.getFirstField());
             statementTemplate.setString(2,
accountSessionMessage.getSecondField());
         };
         SinkFunction<MyMessage> sinkFunction = JdbcSink.sink(
                 "INSERT INTO my_table (first_field, second_field)
VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;",
                 jdbcStatementBuilder,
                 jdbcConnectionOptions
         );
         return new SinkFunctionSpec<>(
                 SINK_ID,
                 sinkFunction
         );
     }

}

---------------------------------------


----------- Module.java ---------------

...

MyMessageSink myMessageSink = new MyMessageSink();
binder.bindEgress(myMessageSink.getEgressSpec());

...

----------------------------------------------


Best regards,

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply | Threaded
Open this post in threaded view
|

Re: [Stateful Functions] JDBC Sink Problems

Jan Brusch

Hi Igal,

thanks for the quick reply (as always) and the corresponding issue.

Building StateFun from source is potentially an option for us until the feature makes it into an upcoming release. If there is anything we can do to help with the issue, please let us know.

At least once is good enough for our use case, but thanks for the heads up. We would prefer to sink the data directly from StateFun if possible in any way, but should it not be possible, we will find another option (Although sinking nested Protobuf Messages from Kafka to SQL is not a trivial matter either... ;-)).


Best regards and a nice evening,

Jan

On 04.02.21 18:27, Igal Shilman wrote:
Hi Jan,

StateFun enables object reuse automatically, and it can't be disabled with a configuration.
There is a technical reason for that that has to do with how we translate StateFun concepts to Flink concepts.
I've created an issue to remove this limitation [1].

I might come up with a workaround in the upcoming few days, and let you know, if you are ok with building StateFun from source?
Otherwise, we will try to address this in the next StateFun release.

While we are on the topic of the JdbcSink, as far as I know, it doesn't support exactly once.
If this is important to you, then I will suggest simply emitting the inserts to Kafka and periodically
bulk insert them to the database.

All the best,
Igal.


On Thu, Feb 4, 2021 at 3:13 PM Jan Brusch <[hidden email]> wrote:
Hello,

we are currently trying to implement a JDBC Sink in Stateful Functions
as documented here:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html

However, when starting the application we are running into this error:

--------------------------------------------------------------------

java.lang.IllegalStateException: objects can not be reused with JDBC
sink function at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.connector.jdbc.JdbcSink.lambda$sink$97f3ed45$1(JdbcSink.java:67)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:131)
~[?:?]
         at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:113)
~[?:?]                                  at
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
~[?:?]
         at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
~[flink-dist_2.12-1.11.3.ja
r:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.12-1.11.3
.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.12-1.11.3.jar:1.11.3]
         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
2021-02-04 13:59:49,121 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to
  restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.
2021-02-04 13:59:49,122 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 8 tasks should be re
started to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_0.

-------------------------------------------------------------------

We tested the same sink in a regular flink application under similar
circumstances (Protobuf Objects etc.) and it works just fine. As a
solution we have tried to set the parameter "pipeline.object-reuse" in
the flink-conf.yaml of the stateful functions application to true, but
that had no effect on the above error message. Stateful Functions
version is 2.2.2


Did anyone else see this problem before?


Relevant Application Code:

-------- MyMessageSink.java -----

public class MyMessageSink {
     public static final EgressIdentifier<MyMessage> SINK_ID =
             new EgressIdentifier<>("mynamespace", "MyMessageSink",
MyMessage.class);

     public EgressSpec<MyMessage> getEgressSpec() {
         JdbcConnectionOptions jdbcConnectionOptions = new
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withDriverName("org.postgresql.Driver")
.withUrl("jdbc:postgresql://localhost:5432/mydb?user=foo&password=bar")
                 .build();
         JdbcStatementBuilder<MyMessage> jdbcStatementBuilder =
(statementTemplate, myMessage) -> {
             statementTemplate.setString(1, myMessage.getFirstField());
             statementTemplate.setString(2,
accountSessionMessage.getSecondField());
         };
         SinkFunction<MyMessage> sinkFunction = JdbcSink.sink(
                 "INSERT INTO my_table (first_field, second_field)
VALUES( ?, ? ) ON CONFLICT (first_field, second_field) DO NOTHING;",
                 jdbcStatementBuilder,
                 jdbcConnectionOptions
         );
         return new SinkFunctionSpec<>(
                 SINK_ID,
                 sinkFunction
         );
     }

}

---------------------------------------


----------- Module.java ---------------

...

MyMessageSink myMessageSink = new MyMessageSink();
binder.bindEgress(myMessageSink.getEgressSpec());

...

----------------------------------------------


Best regards,

Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501