Effect of renaming a primary key

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Effect of renaming a primary key

Rex Fenley
Hello,

When two column names collide in a join, the user is forced to change the name of one of the columns for the join to be valid. However, if those columns are primary keys such as "id", won't that therefore change the key used to reference the state in RockDB and in a checkpoint for the associated table? How might this effect state storage and checkpointing? Will the pk that was renamed remain a key for state or is some other mechanism used to form a key?

My example

as you can see, I alter "id" to . This is because the following code must join two tables which both have PK "id".

tableEnv.executeSql("""
CREATE TABLE topic_users (
id BIGINT,
deleted_at BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
)
""")
val usersNotDeletedTable =
tableEnv
.from("topic_users")
.select($"*")
// Will PK automatically change to "users_user_id"?
.renameColumns($"id".as("users_user_id"))
.filter($"deleted_at".isNull)

val membershipsNotDeletedTable =
membershipsTable // This table also has "id" PK
.join(
usersNotDeletedTable,
$"user_id" === $"users_user_id"
)
.dropColumns($"users_user_id")

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Effect of renaming a primary key

Leonard Xu
Hi, Rex


, won't that therefore change the key used to reference the state in RockDB and in a checkpoint for the associated table?
How might this effect state storage and checkpointing?
Will the pk that was renamed remain a key for state or is some other mechanism used to form a key?

Yes, I think Flink still can infer the primary key from renamed pk column, the state and checkpoint does not effect by the rename action.

BTW, Flink uses primary key in some optimizations,  but Flink does not check the primary key integrity because Flink does not own data like DB.
So user should ensure the unique key integrity when define a primary key, but It’s unusual the data from Kafka is unique as I know, 
this may lead unexpected result.

Best,
Leonard   



tableEnv.executeSql("""
CREATE TABLE topic_users (
id BIGINT,
deleted_at BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
)
""")
val usersNotDeletedTable =
tableEnv
.from("topic_users")
.select($"*")
// Will PK automatically change to "users_user_id"?
.renameColumns($"id".as("users_user_id"))
.filter($"deleted_at".isNull)

val membershipsNotDeletedTable =
membershipsTable // This table also has "id" PK
.join(
usersNotDeletedTable,
$"user_id" === $"users_user_id"
)
.dropColumns($"users_user_id")

Thanks!

--
Rex Fenley  |  Software Engineer - Mobile and Backend


Reply | Threaded
Open this post in threaded view
|

Re: Effect of renaming a primary key

Rex Fenley
>Yes, I think Flink still can infer the primary key from renamed pk column, the state and checkpoint does not effect by the rename action.

Thanks for the info, that's good news.

>but It’s unusual the data from Kafka is unique as I know

I'm using Debezium CDC so the PK is unique per row update. The PK needs to be the same per row so that the state represented by that row gets updated accordingly.

Thanks!

On Fri, Oct 16, 2020 at 12:29 AM Leonard Xu <[hidden email]> wrote:
Hi, Rex


, won't that therefore change the key used to reference the state in RockDB and in a checkpoint for the associated table?
How might this effect state storage and checkpointing?
Will the pk that was renamed remain a key for state or is some other mechanism used to form a key?

Yes, I think Flink still can infer the primary key from renamed pk column, the state and checkpoint does not effect by the rename action.

BTW, Flink uses primary key in some optimizations,  but Flink does not check the primary key integrity because Flink does not own data like DB.
So user should ensure the unique key integrity when define a primary key, but It’s unusual the data from Kafka is unique as I know, 
this may lead unexpected result.

Best,
Leonard   



tableEnv.executeSql("""
CREATE TABLE topic_users (
id BIGINT,
deleted_at BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
...
)
""")
val usersNotDeletedTable =
tableEnv
.from("topic_users")
.select($"*")
// Will PK automatically change to "users_user_id"?
.renameColumns($"id".as("users_user_id"))
.filter($"deleted_at".isNull)

val membershipsNotDeletedTable =
membershipsTable // This table also has "id" PK
.join(
usersNotDeletedTable,
$"user_id" === $"users_user_id"
)
.dropColumns($"users_user_id")

Thanks!

--
Rex Fenley  |  Software Engineer - Mobile and Backend




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US