DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Yuval Itzchakov
Hi,
I'm reworking an existing UpsertStreamTableSink into the new DynamicTableSink API. In the previous API, one would get the unique keys for upsert queries via the `setKeyFields` method, which would calculate them based on the grouping keys during the translation phase. 

Searching around, I saw that JDBC (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling) relies on explicit key passing via the PRIMARY KEY constraint. However, this would require additional manual insertion which I am trying to avoid.

What would be the proper way to receive the unique keys for upsert queries with the new DynamicTableSink API?

--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Timo Walther
Hi Yuval,

we changed this behavior a bit to be more SQL compliant. Currently,
sinks must be explicitly defined with a PRIMARY KEY constraint. We had
discussions about implicit sinks, but nothing on the roadmap yet. The
`CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
original table with just a primary key.

Regards,
Timo


On 03.02.21 14:09, Yuval Itzchakov wrote:

> Hi,
> I'm reworking an existing UpsertStreamTableSink into the new
> DynamicTableSink API. In the previous API, one would get the unique keys
> for upsert queries via the `setKeyFields` method, which would calculate
> them based on the grouping keys during the translation phase.
>
> Searching around, I saw that JDBC
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>)
> relies on explicit key passing via the PRIMARY KEY constraint. However,
> this would require additional manual insertion which I am trying to avoid.
>
> What would be the proper way to receive the unique keys for upsert
> queries with the new DynamicTableSink API?
>
> --
> Best Regards,
> Yuval Itzchakov.

Reply | Threaded
Open this post in threaded view
|

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Yuval Itzchakov
Hi Timo,

The problem with this is I would still have to determine the keys manually, which is not really feasible in my case. Is there any internal API that might be of use to extract this information?

On Wed, Feb 3, 2021 at 5:19 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

we changed this behavior a bit to be more SQL compliant. Currently,
sinks must be explicitly defined with a PRIMARY KEY constraint. We had
discussions about implicit sinks, but nothing on the roadmap yet. The
`CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
original table with just a primary key.

Regards,
Timo


On 03.02.21 14:09, Yuval Itzchakov wrote:
> Hi,
> I'm reworking an existing UpsertStreamTableSink into the new
> DynamicTableSink API. In the previous API, one would get the unique keys
> for upsert queries via the `setKeyFields` method, which would calculate
> them based on the grouping keys during the translation phase.
>
> Searching around, I saw that JDBC
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>)
> relies on explicit key passing via the PRIMARY KEY constraint. However,
> this would require additional manual insertion which I am trying to avoid.
>
> What would be the proper way to receive the unique keys for upsert
> queries with the new DynamicTableSink API?
>
> --
> Best Regards,
> Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

Timo Walther
For this you might need to go a level deeper.

Maybe the legacy util
org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker can help
you. It analyzes the plan to figure out the keys.
org.apache.flink.table.planner.plan.metadata.FlinkRelMdUniqueKeys seems
the newer version.

Regards,
Timo

On 03.02.21 16:24, Yuval Itzchakov wrote:

> Hi Timo,
>
> The problem with this is I would still have to determine the keys
> manually, which is not really feasible in my case. Is there any internal
> API that might be of use to extract this information?
>
> On Wed, Feb 3, 2021 at 5:19 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yuval,
>
>     we changed this behavior a bit to be more SQL compliant. Currently,
>     sinks must be explicitly defined with a PRIMARY KEY constraint. We had
>     discussions about implicit sinks, but nothing on the roadmap yet. The
>     `CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
>     original table with just a primary key.
>
>     Regards,
>     Timo
>
>
>     On 03.02.21 14:09, Yuval Itzchakov wrote:
>      > Hi,
>      > I'm reworking an existing UpsertStreamTableSink into the new
>      > DynamicTableSink API. In the previous API, one would get the
>     unique keys
>      > for upsert queries via the `setKeyFields` method, which would
>     calculate
>      > them based on the grouping keys during the translation phase.
>      >
>      > Searching around, I saw that JDBC
>      >
>     (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>
>
>      >
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
>     <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>>)
>
>      > relies on explicit key passing via the PRIMARY KEY constraint.
>     However,
>      > this would require additional manual insertion which I am trying
>     to avoid.
>      >
>      > What would be the proper way to receive the unique keys for upsert
>      > queries with the new DynamicTableSink API?
>      >
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.