Alternatives to JDBCAppendTableSink in Flink 1.11

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Alternatives to JDBCAppendTableSink in Flink 1.11

Sambaran
Hi,

I am currently using JDBCAppendTableSink to execute database stored procedures from flink to populate data to external tables using  SingleOutputStreamOperator (version 1.7). Now we are trying to update to Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently when looking for an alternative I could not find any suitable approach which would call database stored procedure. Is there any alternative approach to resolve this?

Regards
Sambaran
Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to JDBCAppendTableSink in Flink 1.11

austin.ce
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I understand what you're current solution looks like, it's something like the following, where you're triggering a procedure on each element of a stream?

      JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
            .setDBUrl("jdbc:derby:memory:ebookshop")
            .setQuery("EXEC YourProcedure")
            .build();

        SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("a"), Row.of("b"));
        sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran <[hidden email]> wrote:
Hi,

I am currently using JDBCAppendTableSink to execute database stored procedures from flink to populate data to external tables using  SingleOutputStreamOperator (version 1.7). Now we are trying to update to Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently when looking for an alternative I could not find any suitable approach which would call database stored procedure. Is there any alternative approach to resolve this?

Regards
Sambaran
Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to JDBCAppendTableSink in Flink 1.11

austin.ce
Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement? I assume you tried something like this?

SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
      // extend `preparedStatement` with row info
      preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:derby:memory:ebookshop")
      .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
      .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran <[hidden email]> wrote:
Hi Austin,

We are using this for jdbc interfacing to populate postgres tables based on the data coming from an event source. 

We tried with the approach mentioned in the doc https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html but did not find a suitable way to map SingleOutputStreamOperator<Row> . Can you please let me know if this is the right approach and if yes, how do we map the SingleOutputStreamOperator<Row> to the preparedstatement in JdbcStatementBuilder?

Thanks for your help!

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <[hidden email]> wrote:
Great, thanks for the clarification! I'm checking with others now. Are you using other parts of the Table/SQL APIs, or just this for JDBC interfacing?

Best,
Austin

On Tue, Apr 20, 2021 at 12:20 PM Sambaran <[hidden email]> wrote:
Hi Austin,

Thanks for replying. This is exactly as you mentioned here. Do we have a way to execute the procedure with 1.11 or upper version as JDBCAppendTableSink is no longer available with these?

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I understand what you're current solution looks like, it's something like the following, where you're triggering a procedure on each element of a stream?

      JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
            .setDBUrl("jdbc:derby:memory:ebookshop")
            .setQuery("EXEC YourProcedure")
            .build();

        SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("a"), Row.of("b"));
        sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran <[hidden email]> wrote:
Hi,

I am currently using JDBCAppendTableSink to execute database stored procedures from flink to populate data to external tables using  SingleOutputStreamOperator (version 1.7). Now we are trying to update to Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently when looking for an alternative I could not find any suitable approach which would call database stored procedure. Is there any alternative approach to resolve this?

Regards
Sambaran
Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to JDBCAppendTableSink in Flink 1.11

Sambaran
Hi Austin,

Many thanks, we indeed were using the Api incorrectly. Now in local tests we can see the data population happened in the postgres.

Have a nice day!

Regards
Sambaran

On Tue, Apr 20, 2021 at 8:11 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement? I assume you tried something like this?

SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
      // extend `preparedStatement` with row info
      preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:derby:memory:ebookshop")
      .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
      .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran <[hidden email]> wrote:
Hi Austin,

We are using this for jdbc interfacing to populate postgres tables based on the data coming from an event source. 

We tried with the approach mentioned in the doc https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html but did not find a suitable way to map SingleOutputStreamOperator<Row> . Can you please let me know if this is the right approach and if yes, how do we map the SingleOutputStreamOperator<Row> to the preparedstatement in JdbcStatementBuilder?

Thanks for your help!

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <[hidden email]> wrote:
Great, thanks for the clarification! I'm checking with others now. Are you using other parts of the Table/SQL APIs, or just this for JDBC interfacing?

Best,
Austin

On Tue, Apr 20, 2021 at 12:20 PM Sambaran <[hidden email]> wrote:
Hi Austin,

Thanks for replying. This is exactly as you mentioned here. Do we have a way to execute the procedure with 1.11 or upper version as JDBCAppendTableSink is no longer available with these?

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I understand what you're current solution looks like, it's something like the following, where you're triggering a procedure on each element of a stream?

      JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
            .setDBUrl("jdbc:derby:memory:ebookshop")
            .setQuery("EXEC YourProcedure")
            .build();

        SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("a"), Row.of("b"));
        sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran <[hidden email]> wrote:
Hi,

I am currently using JDBCAppendTableSink to execute database stored procedures from flink to populate data to external tables using  SingleOutputStreamOperator (version 1.7). Now we are trying to update to Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently when looking for an alternative I could not find any suitable approach which would call database stored procedure. Is there any alternative approach to resolve this?

Regards
Sambaran
Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to JDBCAppendTableSink in Flink 1.11

austin.ce
Great to hear!

Austin

On Wed, Apr 21, 2021 at 6:19 AM Sambaran <[hidden email]> wrote:
Hi Austin,

Many thanks, we indeed were using the Api incorrectly. Now in local tests we can see the data population happened in the postgres.

Have a nice day!

Regards
Sambaran

On Tue, Apr 20, 2021 at 8:11 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement? I assume you tried something like this?

SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
      // extend `preparedStatement` with row info
      preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:derby:memory:ebookshop")
      .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
      .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran <[hidden email]> wrote:
Hi Austin,

We are using this for jdbc interfacing to populate postgres tables based on the data coming from an event source. 

We tried with the approach mentioned in the doc https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html but did not find a suitable way to map SingleOutputStreamOperator<Row> . Can you please let me know if this is the right approach and if yes, how do we map the SingleOutputStreamOperator<Row> to the preparedstatement in JdbcStatementBuilder?

Thanks for your help!

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <[hidden email]> wrote:
Great, thanks for the clarification! I'm checking with others now. Are you using other parts of the Table/SQL APIs, or just this for JDBC interfacing?

Best,
Austin

On Tue, Apr 20, 2021 at 12:20 PM Sambaran <[hidden email]> wrote:
Hi Austin,

Thanks for replying. This is exactly as you mentioned here. Do we have a way to execute the procedure with 1.11 or upper version as JDBCAppendTableSink is no longer available with these?

Regards
Sambaran

On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I understand what you're current solution looks like, it's something like the following, where you're triggering a procedure on each element of a stream?

      JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
            .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
            .setDBUrl("jdbc:derby:memory:ebookshop")
            .setQuery("EXEC YourProcedure")
            .build();

        SingleOutputStreamOperator<Row> stream = env.fromElements(Row.of("a"), Row.of("b"));
        sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran <[hidden email]> wrote:
Hi,

I am currently using JDBCAppendTableSink to execute database stored procedures from flink to populate data to external tables using  SingleOutputStreamOperator (version 1.7). Now we are trying to update to Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently when looking for an alternative I could not find any suitable approach which would call database stored procedure. Is there any alternative approach to resolve this?

Regards
Sambaran