Re: Please advise bootstrapping large state

Posted by Marco Villalobos-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Please-advise-bootstrapping-large-state-tp44460p44500.html

Thank you very much! 

I tried using Flink's SQL JDBC connector, and ran into issues.  According to the flink documentation, only the old planner is compatible with the DataSet API.

When I connect to the table:

CREATE TABLE my_table (
....
) WITH (
   'connector.type' = 'jdbc',
   'connector.url' = '?',
   'connector.username' = '?',
   'connector.password' = '?',
   'connector.table' = 'my_table'
)

It creates a JdbcTableSource, but only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.

By the way, it was very challenging to figure out how to create that connection string, because its a different format than what is in the documentation. I had to comb through JdbcTableSourceSinkFactory to figure out how to connect.

Is it even possible to use the DataSet API with the Table SQL api in Flink 1.12.1?


On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger <[hidden email]> wrote:
Hi Marco,

The DataSet API will not run out of memory, as it spills to disk if the data doesn't fit anymore.
Load is distributed by partitioning data.

Giving you advice depends a bit on the use-case. I would explore two major options:
a) reading the data from postgres using Flink's SQL JDBC connector [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes to transfer that (125 megabytes / second)
b) Using the DataSet API and state processor API. I would first try to see how much effort it is to read the data using the DataSet API (could be less convenient than the Flink SQL JDBC connector).


On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos <[hidden email]> wrote:
I must bootstrap state from postgres (approximately 200 GB of data) and I notice that the state processor API requires the DataSet API in order to bootstrap state for the Stream API.

I wish there was a way to use the SQL API and use a partitioned scan, but I don't know if that is even possible with the DataSet API.

I never used the DataSet API, and I am unsure how it manages memory, or distributes load, when handling large state.

Would it run out of memory if I map data from a JDBCInputFormat into a large DataSet and then use that to bootstrap state for my stream job?

Any advice on how I should proceed with this would be greatly appreciated.

Thank you.