I must bootstrap state from postgres (approximately 200 GB of data) and I notice that the state processor API requires the DataSet API in order to bootstrap state for the Stream API.
I wish there was a way to use the SQL API and use a partitioned scan, but I don't know if that is even possible with the DataSet API. I never used the DataSet API, and I am unsure how it manages memory, or distributes load, when handling large state. Would it run out of memory if I map data from a JDBCInputFormat into a large DataSet and then use that to bootstrap state for my stream job? Any advice on how I should proceed with this would be greatly appreciated. Thank you. |
Hi Marco, The DataSet API will not run out of memory, as it spills to disk if the data doesn't fit anymore. Load is distributed by partitioning data. Giving you advice depends a bit on the use-case. I would explore two major options: a) reading the data from postgres using Flink's SQL JDBC connector [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes to transfer that (125 megabytes / second) b) Using the DataSet API and state processor API. I would first try to see how much effort it is to read the data using the DataSet API (could be less convenient than the Flink SQL JDBC connector). On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos <[hidden email]> wrote:
|
Thank you very much! I tried using Flink's SQL JDBC connector, and ran into issues. According to the flink documentation, only the old planner is compatible with the DataSet API. When I connect to the table: CREATE TABLE my_table ( .... ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = '?', 'connector.username' = '?', 'connector.password' = '?', 'connector.table' = 'my_table' ) It creates a JdbcTableSource, but only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. By the way, it was very challenging to figure out how to create that connection string, because its a different format than what is in the documentation. I had to comb through JdbcTableSourceSinkFactory to figure out how to connect. Is it even possible to use the DataSet API with the Table SQL api in Flink 1.12.1? On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger <[hidden email]> wrote:
|
Hi Marco,
which operations do you want to execute in the bootstrap pipeline? Maybe you don't need to use SQL and old planner. At least this would simplify the friction by going through another API layer. The JDBC connector can be directly be used in DataSet API as well. Regards, Timo On 17.06.21 07:33, Marco Villalobos wrote: > Thank you very much! > > I tried using Flink's SQL JDBC connector, and ran into issues. > According to the flink documentation, only the old planner is compatible > with the DataSet API. > > When I connect to the table: > > CREATE TABLE my_table ( > .... > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = '?', > 'connector.username' = '?', > 'connector.password' = '?', > 'connector.table' = 'my_table' > ) > > It creates a JdbcTableSource, but only BatchTableSource and > InputFormatTableSource are supported in BatchTableEnvironment. > > By the way, it was very challenging to figure out how to create that > connection string, because its a different format than what is in the > documentation. I had to comb through JdbcTableSourceSinkFactory to > figure out how to connect. > > Is it even possible to use the DataSet API with the Table SQL api in > Flink 1.12.1? > > > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Marco, > > The DataSet API will not run out of memory, as it spills to disk if > the data doesn't fit anymore. > Load is distributed by partitioning data. > > Giving you advice depends a bit on the use-case. I would explore two > major options: > a) reading the data from postgres using Flink's SQL JDBC connector > [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes > to transfer that (125 megabytes / second) > b) Using the DataSet API and state processor API. I would first try > to see how much effort it is to read the data using the DataSet API > (could be less convenient than the Flink SQL JDBC connector). > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/> > > > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos > <[hidden email] <mailto:[hidden email]>> wrote: > > I must bootstrap state from postgres (approximately 200 GB of > data) and I notice that the state processor API requires the > DataSet API in order to bootstrap state for the Stream API. > > I wish there was a way to use the SQL API and use a partitioned > scan, but I don't know if that is even possible with the DataSet > API. > > I never used the DataSet API, and I am unsure how it manages > memory, or distributes load, when handling large state. > > Would it run out of memory if I map data from a JDBCInputFormat > into a large DataSet and then use that to bootstrap state for my > stream job? > > Any advice on how I should proceed with this would be greatly > appreciated. > > Thank you. > |
I need to bootstrap a keyed process function. So, I was hoping to use the Table SQL API because I thought it could parallelize the work more efficiently via partitioning. I need to boot strap keyed state for a keyed process function, with Flnk 1.12.1, thus I think I am required to use the DataSet API. Is my only option JdbcInputFormat? ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv); batchTableEnv.executeSql(" CREATE TABLE my_table ( .... ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = '?', 'connector.username' = '?', 'connector.password' = '?', 'connector.table' = 'my_table' )"); Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table"); DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class); rowDataSet.print(); This ends up throwing this exception: org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <[hidden email]> wrote: Hi Marco, |
Hi Marco,
as Robert already mentioned, the BatchTableEnvironment is simply build on top of the DataSet API, partitioning functionality is also available in DataSet API. So using the JdbcInputFormat directly should work in DataSet API. Otherwise I would recommend to use some initial pipeline to transfer the data from JDBC maybe to a CSV file. Flink should support that. Regards, Timo On 17.06.21 17:43, Marco Villalobos wrote: > I need to bootstrap a keyed process function. > > So, I was hoping to use the Table SQL API because I thought it could > parallelize the work more efficiently via partitioning. > I need to boot strap keyed state for a keyed process function, with > Flnk 1.12.1, thus I think I am required to use the DataSet API. > > Is my only option JdbcInputFormat? > > ExecutionEnvironment batchEnv = > ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment batchTableEnv = > BatchTableEnvironment.create(batchEnv); > batchTableEnv.executeSql(" > CREATE TABLE my_table ( > .... > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = '?', > 'connector.username' = '?', > 'connector.password' = '?', > 'connector.table' = 'my_table' > )"); > > Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table"); > DataSet<Row> rowDataSet = batchTableEnv.toDataSet(table, Row.class); > rowDataSet.print(); > > This ends up throwing this exception: > > org.apache.flink.table.api.TableException: Only BatchTableSource and > InputFormatTableSource are supported in BatchTableEnvironment. > at > org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) > at > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) > at > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) > at > org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) > at > org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) > > On Thu, Jun 17, 2021 at 12:51 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Marco, > > which operations do you want to execute in the bootstrap pipeline? > > Maybe you don't need to use SQL and old planner. At least this would > simplify the friction by going through another API layer. > > The JDBC connector can be directly be used in DataSet API as well. > > Regards, > Timo > > > > On 17.06.21 07:33, Marco Villalobos wrote: > > Thank you very much! > > > > I tried using Flink's SQL JDBC connector, and ran into issues. > > According to the flink documentation, only the old planner is > compatible > > with the DataSet API. > > > > When I connect to the table: > > > > CREATE TABLE my_table ( > > .... > > ) WITH ( > > 'connector.type' = 'jdbc', > > 'connector.url' = '?', > > 'connector.username' = '?', > > 'connector.password' = '?', > > 'connector.table' = 'my_table' > > ) > > > > It creates a JdbcTableSource, but only BatchTableSource and > > InputFormatTableSource are supported in BatchTableEnvironment. > > > > By the way, it was very challenging to figure out how to create that > > connection string, because its a different format than what is in > the > > documentation. I had to comb through JdbcTableSourceSinkFactory to > > figure out how to connect. > > > > Is it even possible to use the DataSet API with the Table SQL api in > > Flink 1.12.1? > > > > > > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger > <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > Hi Marco, > > > > The DataSet API will not run out of memory, as it spills to > disk if > > the data doesn't fit anymore. > > Load is distributed by partitioning data. > > > > Giving you advice depends a bit on the use-case. I would > explore two > > major options: > > a) reading the data from postgres using Flink's SQL JDBC > connector > > [1]. 200 GB is not much data. A 1gb network link needs ~30 > minutes > > to transfer that (125 megabytes / second) > > b) Using the DataSet API and state processor API. I would > first try > > to see how much effort it is to read the data using the > DataSet API > > (could be less convenient than the Flink SQL JDBC connector). > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ > <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/> > > > <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/>> > > > > > > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos > > <[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] > <mailto:[hidden email]>>> wrote: > > > > I must bootstrap state from postgres (approximately 200 GB of > > data) and I notice that the state processor API requires the > > DataSet API in order to bootstrap state for the Stream API. > > > > I wish there was a way to use the SQL API and use a > partitioned > > scan, but I don't know if that is even possible with the > DataSet > > API. > > > > I never used the DataSet API, and I am unsure how it manages > > memory, or distributes load, when handling large state. > > > > Would it run out of memory if I map data from a > JDBCInputFormat > > into a large DataSet and then use that to bootstrap state > for my > > stream job? > > > > Any advice on how I should proceed with this would be greatly > > appreciated. > > > > Thank you. > > > |
It was not clear to me that JdbcInputFormat was part of the DataSet api. Now I understand. Thank you. On Fri, Jun 18, 2021 at 5:23 AM Timo Walther <[hidden email]> wrote: Hi Marco, |
Free forum by Nabble | Edit this page |