Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

David Haglund

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance
NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden
Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se
Together for smarter safety
Reply | Threaded
Open this post in threaded view
|

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

David Haglund

I have an update. I have created a small project on github, https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which reproduces the issue.

 

There seems to be problem with RocksDB in all versions I have tested (from 1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. In Flink 1.10.x and later all events are counted but with separate keys when all/both events should be counted using the same key.

 

The main branch in my sample project is using Flink 1.11.3, then there are branches for Flink 1.9.1, 1.10.3 and 1.12.1.

 

Best regards,

/David Haglund

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se

Together for smarter safety

 

Reply | Threaded
Open this post in threaded view
|

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

David Haglund

A colleague of mine found some hint under “Avro type” [2] in the State evolution schema page:

 

Example: RocksDB state backend relies on binary objects identity, rather than hashCode method implementation. Any changes to the keys object structure could lead to non deterministic behaviour.

 

I guess it is a known issue then, but it would at least to include that kind of fundamental information on the state backend page as well.

 

Best regards,

/David Haglund

 

[2]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types

 

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 19:57

I have an update. I have created a small project on github, https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which reproduces the issue.

 

There seems to be problem with RocksDB in all versions I have tested (from 1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. In Flink 1.10.x and later all events are counted but with separate keys when all/both events should be counted using the same key.

 

The main branch in my sample project is using Flink 1.11.3, then there are branches for Flink 1.9.1, 1.10.3 and 1.12.1.

 

Best regards,

/David Haglund

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se

Together for smarter safety

 

Reply | Threaded
Open this post in threaded view
|

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

rmetzger0
Hey David,

this is a good catch! I've filed a JIRA ticket to address this in the docs more prominently: https://issues.apache.org/jira/browse/FLINK-21073

Thanks a lot for reporting this issue!

On Thu, Jan 21, 2021 at 9:24 AM David Haglund <[hidden email]> wrote:

A colleague of mine found some hint under “Avro type” [2] in the State evolution schema page:

 

Example: RocksDB state backend relies on binary objects identity, rather than hashCode method implementation. Any changes to the keys object structure could lead to non deterministic behaviour.

 

I guess it is a known issue then, but it would at least to include that kind of fundamental information on the state backend page as well.

 

Best regards,

/David Haglund

 

[2]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types

 

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 19:57

I have an update. I have created a small project on github, https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which reproduces the issue.

 

There seems to be problem with RocksDB in all versions I have tested (from 1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. In Flink 1.10.x and later all events are counted but with separate keys when all/both events should be counted using the same key.

 

The main branch in my sample project is using Flink 1.11.3, then there are branches for Flink 1.9.1, 1.10.3 and 1.12.1.

 

Best regards,

/David Haglund

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se

Together for smarter safety

 

Reply | Threaded
Open this post in threaded view
|

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

Yun Tang
Hi David,

Thanks for your enthusiasm to figure out the root cause. The key difference is that RocksDB holds binary objects which are only defined by the serialized bytes while Fs/MemoryStateBackend holds objects in pojo format which are defined by the hashCode and equals. If you want to achieve the same effort as using MemoryStateBackend, please provide the customizer serializer to your user-defined classes [1] to ignore some field (however, it would cause field lost when deserializing).


Best
Yun Tang

From: Robert Metzger <[hidden email]>
Sent: Thursday, January 21, 2021 22:49
To: David Haglund <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB
 
Hey David,

this is a good catch! I've filed a JIRA ticket to address this in the docs more prominently: https://issues.apache.org/jira/browse/FLINK-21073

Thanks a lot for reporting this issue!

On Thu, Jan 21, 2021 at 9:24 AM David Haglund <[hidden email]> wrote:

A colleague of mine found some hint under “Avro type” [2] in the State evolution schema page:

 

Example: RocksDB state backend relies on binary objects identity, rather than hashCode method implementation. Any changes to the keys object structure could lead to non deterministic behaviour.

 

I guess it is a known issue then, but it would at least to include that kind of fundamental information on the state backend page as well.

 

Best regards,

/David Haglund

 

[2]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types

 

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 19:57

I have an update. I have created a small project on github, https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which reproduces the issue.

 

There seems to be problem with RocksDB in all versions I have tested (from 1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. In Flink 1.10.x and later all events are counted but with separate keys when all/both events should be counted using the same key.

 

The main branch in my sample project is using Flink 1.11.3, then there are branches for Flink 1.9.1, 1.10.3 and 1.12.1.

 

Best regards,

/David Haglund

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se

Together for smarter safety

 

Reply | Threaded
Open this post in threaded view
|

Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

David Haglund

Hi Robert  and Yun Tang,

 

Robert: Sounds good.

 

Yun: Thanks for the info about the custom serializer. I ended up hard coding the fields which we did not want in to use in the keyBy.

 

Thanks,

/David

 

From: Yun Tang <[hidden email]>
Date: Friday, 22 January 2021 at 04:52
To: Robert Metzger <[hidden email]>, David Haglund <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

Hi David,

 

Thanks for your enthusiasm to figure out the root cause. The key difference is that RocksDB holds binary objects which are only defined by the serialized bytes while Fs/MemoryStateBackend holds objects in pojo format which are defined by the hashCode and equals. If you want to achieve the same effort as using MemoryStateBackend, please provide the customizer serializer to your user-defined classes [1] to ignore some field (however, it would cause field lost when deserializing).

 

 

Best

Yun Tang


From: Robert Metzger <[hidden email]>
Sent: Thursday, January 21, 2021 22:49
To: David Haglund <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Problem with overridden hashCode/equals in keys in Flink 1.11.3 when checkpointing with RocksDB

 

Hey David,

 

this is a good catch! I've filed a JIRA ticket to address this in the docs more prominently: https://issues.apache.org/jira/browse/FLINK-21073

 

Thanks a lot for reporting this issue!

 

On Thu, Jan 21, 2021 at 9:24 AM David Haglund <[hidden email]> wrote:

A colleague of mine found some hint under “Avro type” [2] in the State evolution schema page:

 

Example: RocksDB state backend relies on binary objects identity, rather than hashCode method implementation. Any changes to the keys object structure could lead to non deterministic behaviour.

 

I guess it is a known issue then, but it would at least to include that kind of fundamental information on the state backend page as well.

 

Best regards,

/David Haglund

 

[2]

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#avro-types

 

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 19:57

I have an update. I have created a small project on github, https://github.com/daha/flink-key-by-problem-with-rocksdb-state,  which reproduces the issue.

 

There seems to be problem with RocksDB in all versions I have tested (from 1.7.1 and later). In Flink 1.9.x only one of the events is counted with RockDB. In Flink 1.10.x and later all events are counted but with separate keys when all/both events should be counted using the same key.

 

The main branch in my sample project is using Flink 1.11.3, then there are branches for Flink 1.9.1, 1.10.3 and 1.12.1.

 

Best regards,

/David Haglund

 

From: David Haglund <[hidden email]>
Date: Wednesday, 20 January 2021 at 09:38

I have encountered a problem in Flink when trying to upgrade from Flink 1.9.1 to

Flink 1.11.3.

 

The problem in a combination of 2 components:

 

* Keys implemented as case classes in Scala where we override the equals and

  hashCode methods. The case class has additional fields which we are not used in

  the keyBy (hashCode/equals) but can have different values for a specific key (the

 fields we care about).

* Checkpointing with RocksDB

 

In Flink 1.9.1 everything worked fine, but in Flink 1.11.3 we got aggregations

for each unique key including the parameters which we did not want to include in

the keyBy, which we exclicitly do not use in hashCode and equals. It looks likes

hashCode is ignored in the keyBy in our case when we use RocksDB for checkpoints.

 

We do not see this problem if we disable checkpointing or when using

FsStateBackend.

 

I have seen this with "Incremental Window Aggregation with AggregateFunction"

[1], but a colleague of mine reported he had seen the same issue with

KeyedProcessFunction too.

 

We are using Scala version 2.11.12 and Java 8.

 

This looks like a bug to me. Is it a known issue or a new one?

 

Best regards,

/David Haglund

 

[1] Incremental Window Aggregation with AggregateFunction

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#incremental-window-aggregation-with-aggregatefunction

 

David Haglund
Systems Engineer
Fleet Perception for Maintenance

NIRA Dynamics AB
Wallenbergs gata 4
58330 Linköping
Sweden

Mobile: +46 705 634 848
[hidden email]
www.niradynamics.se

Together for smarter safety