http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Please-advise-bootstrapping-large-state-tp44460p44571.html
So using the JdbcInputFormat directly should work in DataSet API.
data from JDBC maybe to a CSV file. Flink should support that.
> I need to bootstrap a keyed process function.
>
> So, I was hoping to use the Table SQL API because I thought it could
> parallelize the work more efficiently via partitioning.
> I need to boot strap keyed state for a keyed process function, with
> Flnk 1.12.1, thus I think I am required to use the DataSet API.
>
> Is my only option JdbcInputFormat?
>
> ExecutionEnvironment batchEnv =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment batchTableEnv =
> BatchTableEnvironment.create(batchEnv);
> batchTableEnv.executeSql("
> CREATE TABLE my_table (
> ....
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' = '?',
> 'connector.username' = '?',
> 'connector.password' = '?',
> 'connector.table' = 'my_table'
> )");
>
> Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
> DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class);
> rowDataSet.print();
>
> This ends up throwing this exception:
>
> org.apache.flink.table.api.TableException: Only BatchTableSource and
> InputFormatTableSource are supported in BatchTableEnvironment.
> at
> org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
> at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
> at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
> at
> org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
> at
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
>
> On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Marco,
>
> which operations do you want to execute in the bootstrap pipeline?
>
> Maybe you don't need to use SQL and old planner. At least this would
> simplify the friction by going through another API layer.
>
> The JDBC connector can be directly be used in DataSet API as well.
>
> Regards,
> Timo
>
>
>
> On 17.06.21 07:33, Marco Villalobos wrote:
> > Thank you very much!
> >
> > I tried using Flink's SQL JDBC connector, and ran into issues.
> > According to the flink documentation, only the old planner is
> compatible
> > with the DataSet API.
> >
> > When I connect to the table:
> >
> > CREATE TABLE my_table (
> > ....
> > ) WITH (
> > 'connector.type' = 'jdbc',
> > 'connector.url' = '?',
> > 'connector.username' = '?',
> > 'connector.password' = '?',
> > 'connector.table' = 'my_table'
> > )
> >
> > It creates a JdbcTableSource, but only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> >
> > By the way, it was very challenging to figure out how to create that
> > connection string, because its a different format than what is in
> the
> > documentation. I had to comb through JdbcTableSourceSinkFactory to
> > figure out how to connect.
> >
> > Is it even possible to use the DataSet API with the Table SQL api in
> > Flink 1.12.1?
> >
> >
> > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger
> <
[hidden email] <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
> >
> > Hi Marco,
> >
> > The DataSet API will not run out of memory, as it spills to
> disk if
> > the data doesn't fit anymore.
> > Load is distributed by partitioning data.
> >
> > Giving you advice depends a bit on the use-case. I would
> explore two
> > major options:
> > a) reading the data from postgres using Flink's SQL JDBC
> connector
> > [1]. 200 GB is not much data. A 1gb network link needs ~30
> minutes
> > to transfer that (125 megabytes / second)
> > b) Using the DataSet API and state processor API. I would
> first try
> > to see how much effort it is to read the data using the
> DataSet API
> > (could be less convenient than the Flink SQL JDBC connector).
> >
> > [1]
> >
>
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/> <
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/>
> >
> <
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ <
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/>>
> >
> >
> > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
> > <
[hidden email] <mailto:
[hidden email]>
> <mailto:
[hidden email]
> <mailto:
[hidden email]>>> wrote:
> >
> > I must bootstrap state from postgres (approximately 200 GB of
> > data) and I notice that the state processor API requires the
> > DataSet API in order to bootstrap state for the Stream API.
> >
> > I wish there was a way to use the SQL API and use a
> partitioned
> > scan, but I don't know if that is even possible with the
> DataSet
> > API.
> >
> > I never used the DataSet API, and I am unsure how it manages
> > memory, or distributes load, when handling large state.
> >
> > Would it run out of memory if I map data from a
> JDBCInputFormat
> > into a large DataSet and then use that to bootstrap state
> for my
> > stream job?
> >
> > Any advice on how I should proceed with this would be greatly
> > appreciated.
> >
> > Thank you.
> >
>