JDBC Table and parameters provider

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

JDBC Table and parameters provider

Flavio Pompermaier
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio

Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Jingsong Li
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Flavio Pompermaier
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Jingsong Li
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Flavio Pompermaier
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Jingsong Li
Thanks for the explanation.
You can create JIRA for this.

For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk. "
We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <[hidden email]> wrote:
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Flavio Pompermaier
Sorry Jingsong but I didn't understand your reply..Can you better explain the following sentences please? Probably I miss some Table API background here (I used only JDBOutputFormat).
"We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this."

I can take care of opening tickets but I need to understand exactly how many and I need to be sure of explaining the problem with the correct terms.

Best,
Flavio

On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <[hidden email]> wrote:
Thanks for the explanation.
You can create JIRA for this.

For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk. "
We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <[hidden email]> wrote:
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Jingsong Li
Hi,

You can configure table name for JDBC source.
So this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"
So the final scan query statement will be: "select ... from (SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk) where ..."

Why not use this rich sql to scan query statement? Because we have implemented the project pushdown [1] in JDBCTableSource.
Which means the "select ..." is dynamically generated by the Flink sql. We can not set it static.


Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry Jingsong but I didn't understand your reply..Can you better explain the following sentences please? Probably I miss some Table API background here (I used only JDBOutputFormat).
"We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this."

I can take care of opening tickets but I need to understand exactly how many and I need to be sure of explaining the problem with the correct terms.

Best,
Flavio

On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <[hidden email]> wrote:
Thanks for the explanation.
You can create JIRA for this.

For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk. "
We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <[hidden email]> wrote:
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Jingsong Li
> Specify "query" and "provider"
Yes, your proposal looks reasonable to me.
Key can be "scan.***" like in [1].

> specify parameters
Maybe we need add something like "scan.parametervalues.provider.type", it can be "bound, specify, custom":
- when bound, using old partitionLowerBound and partitionUpperBound, numPartitions
- when specify, using specify parameters like your proposal
- when custom, need "scan.parametervalues.provider.class"

> not implement FiltertableTableSource
Just because we have no time to finish it.


Best,
Jingsong Lee 

On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <[hidden email]> wrote:
Ok, now I understand your proposal. However this looks like a workaround to me..I want to be able to give a name to such a table and register also to a catalog if I want.
Indeed my proposal is to add a "connector.read.query" as an alternative to "connector.table" (that forces you to map tables as 1-to-1).
Then we can add a connector.read.parametervalues.provider.class in order to customize the splitting of the query (we can also add a check that the query contains at least 1 question mark).
If we introduce a custom parameters provider we need also to specify parameters, using something like:
'connector.read.parametervalues.0.value'= '12/10/2019'
'connector.read.parametervalues.1.value'= '01/01/2020'
Another question: why JDBC table source does not implement FilterableTableSource?

On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <[hidden email]> wrote:
Hi,

Requirements: read data from "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk"

Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"

I don't why there's a 1-to-1 mapping between a Flink table and a JDBC table. If it is, there is no way support this requirement because this flink table is come from two jdbc tables.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry Jingsong but I have to clarify this thing, which is not clear at all to me.

From what I can see from the documentation of table API there's no way (currently) to associate an SQL query to a Flink Table, there's a 1-to-1 mapping between a Flink table and a JDBC table.
This means that, at the moment, if I want to join 2 tables from the same JDBC source (like in the example) Flink would fetch all the data of the 2 tables and then it will do the join, it will not execute the query directly and get results back. Right?
If this is the case we could open an issue in the Blink optimizer that could improve performance if the query that involves a single JDBC source is executed directly to the database. and that's one point.
Or maybe this is what you were trying to say with "Which means the "select ..." is dynamically generated by the Flink sql. We can not set it static."? Does it mean that we can't specify a query in a JDBC table?
This sounds to go against what you write in the statement before: So this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"

I didn't understand what's your proposals here..I see two issues:
  1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries pushed down in a performant way?
    1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk is performed efficiently to the DB or is it performed in Flink after reading all the tables data?
  2. Add a way to handle custom parameter value provider class and query statements. What is exactly your proposal here?

On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <[hidden email]> wrote:
Hi,

You can configure table name for JDBC source.
So this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"
So the final scan query statement will be: "select ... from (SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk) where ..."

Why not use this rich sql to scan query statement? Because we have implemented the project pushdown [1] in JDBCTableSource.
Which means the "select ..." is dynamically generated by the Flink sql. We can not set it static.


Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry Jingsong but I didn't understand your reply..Can you better explain the following sentences please? Probably I miss some Table API background here (I used only JDBOutputFormat).
"We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this."

I can take care of opening tickets but I need to understand exactly how many and I need to be sure of explaining the problem with the correct terms.

Best,
Flavio

On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <[hidden email]> wrote:
Thanks for the explanation.
You can create JIRA for this.

For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk. "
We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <[hidden email]> wrote:
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: JDBC Table and parameters provider

Flavio Pompermaier
I've created 3 ticket related to this discussion, feel free to comment them:

  1. https://issues.apache.org/jira/browse/FLINK-17358 - JDBCTableSource support FiltertableTableSource
  2. https://issues.apache.org/jira/browse/FLINK-17360 - Support custom partitioners in JDBCReadOptions

  3. https://issues.apache.org/jira/browse/FLINK-17361 - Support creating of a JDBC table using a custom query

Best,
Flavio

On Wed, Apr 22, 2020 at 4:29 PM Jingsong Li <[hidden email]> wrote:
> Specify "query" and "provider"
Yes, your proposal looks reasonable to me.
Key can be "scan.***" like in [1].

> specify parameters
Maybe we need add something like "scan.parametervalues.provider.type", it can be "bound, specify, custom":
- when bound, using old partitionLowerBound and partitionUpperBound, numPartitions
- when specify, using specify parameters like your proposal
- when custom, need "scan.parametervalues.provider.class"

> not implement FiltertableTableSource
Just because we have no time to finish it.


Best,
Jingsong Lee 

On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <[hidden email]> wrote:
Ok, now I understand your proposal. However this looks like a workaround to me..I want to be able to give a name to such a table and register also to a catalog if I want.
Indeed my proposal is to add a "connector.read.query" as an alternative to "connector.table" (that forces you to map tables as 1-to-1).
Then we can add a connector.read.parametervalues.provider.class in order to customize the splitting of the query (we can also add a check that the query contains at least 1 question mark).
If we introduce a custom parameters provider we need also to specify parameters, using something like:
'connector.read.parametervalues.0.value'= '12/10/2019'
'connector.read.parametervalues.1.value'= '01/01/2020'
Another question: why JDBC table source does not implement FilterableTableSource?

On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <[hidden email]> wrote:
Hi,

Requirements: read data from "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk"

Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"

I don't why there's a 1-to-1 mapping between a Flink table and a JDBC table. If it is, there is no way support this requirement because this flink table is come from two jdbc tables.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry Jingsong but I have to clarify this thing, which is not clear at all to me.

From what I can see from the documentation of table API there's no way (currently) to associate an SQL query to a Flink Table, there's a 1-to-1 mapping between a Flink table and a JDBC table.
This means that, at the moment, if I want to join 2 tables from the same JDBC source (like in the example) Flink would fetch all the data of the 2 tables and then it will do the join, it will not execute the query directly and get results back. Right?
If this is the case we could open an issue in the Blink optimizer that could improve performance if the query that involves a single JDBC source is executed directly to the database. and that's one point.
Or maybe this is what you were trying to say with "Which means the "select ..." is dynamically generated by the Flink sql. We can not set it static."? Does it mean that we can't specify a query in a JDBC table?
This sounds to go against what you write in the statement before: So this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"

I didn't understand what's your proposals here..I see two issues:
  1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries pushed down in a performant way?
    1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk is performed efficiently to the DB or is it performed in Flink after reading all the tables data?
  2. Add a way to handle custom parameter value provider class and query statements. What is exactly your proposal here?

On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <[hidden email]> wrote:
Hi,

You can configure table name for JDBC source.
So this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk)"
So the final scan query statement will be: "select ... from (SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk) where ..."

Why not use this rich sql to scan query statement? Because we have implemented the project pushdown [1] in JDBCTableSource.
Which means the "select ..." is dynamically generated by the Flink sql. We can not set it static.


Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier <[hidden email]> wrote:
Sorry Jingsong but I didn't understand your reply..Can you better explain the following sentences please? Probably I miss some Table API background here (I used only JDBOutputFormat).
"We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this."

I can take care of opening tickets but I need to understand exactly how many and I need to be sure of explaining the problem with the correct terms.

Best,
Flavio

On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <[hidden email]> wrote:
Thanks for the explanation.
You can create JIRA for this.

For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk. "
We can not use a simple "scan.query.statement", because in JDBCTableSource, it also deal with project pushdown too. Which means that the select part can not be modified casual.
Maybe you can configure a rich table name for this.

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <[hidden email]> wrote:
Because in my use case the parallelism was not based on a range on keys/numbers but on a range of dates, so I needed a custom Parameter Provider.
For what regards pushdown I don't know how Flink/Blink currently works..for example, let's say I have a Postgres catalog containing 2 tables (public.A and public.B).
If I do the following query : SELECT public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk = public.B.fk
Will this be pushdown as a single query or will Flink fetch both tables and the perform the join? 
Talking with Bowen I understood that to avoid this I could define a VIEW in the db (but this is not alway possible) or in Flink (but from what I know this is very costly).
In this case a parameter "scan.query.statement" without a "scan.parameter.values.provider.class" is super helpful and could improve performance a lot!

On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <[hidden email]> wrote:
Hi,

You are right about the lower and upper, it is a must to parallelize the fetch of the data.
And filter pushdown is used to filter more data at JDBC server.

Yes, we can provide "scan.query.statement" and "scan.parameter.values.provider.class" for jdbc connector. But maybe we need be careful about this too flexible API.

Can you provide more about your case? Why can not been solved by lower and upper with filter pushdown?

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <[hidden email]> wrote:
Maybe I am wrong but support pushdown for JDBC is one thing (that is probably useful) while parameters providers are required if you want to parallelize the fetch of the data.
You are not mandated to use NumericBetweenParametersProvider, you can use the ParametersProvider you prefer, depending on the statement you have.
Or do you have in mind something else?

On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <[hidden email]> wrote:
Hi,

Now in JDBCTableSource.getInputFormat, It's written explicitly: WHERE XXX BETWEEN ? AND ?. So we must use `NumericBetweenParametersProvider`.
I don't think this is a good and long-term solution.
I think we should support filter push-down for JDBCTableSource, so in this way, we can write the filters that we want, what do you think?

Best,
Jingsong Lee


On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <[hidden email]> wrote:
Hi all,
we have a use case where we have a prepared statement that we parameterize using a custom parameters provider (similar to what happens in testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
How can we handle this using the JDBC table API?
What should we do to handle such a use case? Is there anyone willing to mentor us in its implementation?

Another question: why flink-jdbc has not been renamed to flink-connector-jdbc?

Thanks in advance,
Flavio



--
Best, Jingsong Lee



--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee



--
Best, Jingsong Lee