savepoint failure

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

savepoint failure

Radoslav Smilyanov
Hello all,

I am running a Flink job that performs data enrichment. My job has 7 kafka consumers that receive messages for dml statements performed for 7 db tables.

Job setup:
  • Flink is run in k8s in a similar way as it is described here
  • 1 job manager and 2 task managers 
  • parallelism is set to 4 and 2 task slots
  • rocksdb as state backend
  • protobuf for serialization
Whenever I try to trigger a savepoint after my state is bootstrapped I get the following error for different operators:

Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)

Note: key group might vary.

I found this article in Stackoverflow which relates to such an exception (btw my job graph looks similar to the one described in the article except that my job has more joins). I double checked my hashcodes and I think that they are fine.

I tried to reduce the parallelism to 1 with 1 task slot per task manager and this configuration seems to work. This leads me to a direction that it might be some concurrency issue.

I would like to understand what is causing the savepoint failure. Do you have any suggestions what I might be missing?

Thanks in advance!

Best Regards,
Rado
Reply | Threaded
Open this post in threaded view
|

Re: savepoint failure

Till Rohrmann
Hi Rado,

it is hard to tell the reason w/o a bit more details. Could you share with us the complete logs of the problematic run? Also the job you are running and the types of the state you are storing in RocksDB and use as events in your job are very important. In the linked SO question, the problem was a type whose hashcode was not immutable.

Cheers,
Till

On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <[hidden email]> wrote:
Hello all,

I am running a Flink job that performs data enrichment. My job has 7 kafka consumers that receive messages for dml statements performed for 7 db tables.

Job setup:
  • Flink is run in k8s in a similar way as it is described here
  • 1 job manager and 2 task managers 
  • parallelism is set to 4 and 2 task slots
  • rocksdb as state backend
  • protobuf for serialization
Whenever I try to trigger a savepoint after my state is bootstrapped I get the following error for different operators:

Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)

Note: key group might vary.

I found this article in Stackoverflow which relates to such an exception (btw my job graph looks similar to the one described in the article except that my job has more joins). I double checked my hashcodes and I think that they are fine.

I tried to reduce the parallelism to 1 with 1 task slot per task manager and this configuration seems to work. This leads me to a direction that it might be some concurrency issue.

I would like to understand what is causing the savepoint failure. Do you have any suggestions what I might be missing?

Thanks in advance!

Best Regards,
Rado
Reply | Threaded
Open this post in threaded view
|

Re: savepoint failure

Till Rohrmann
Glad to hear that you solved your problem. Afaik Flink should not read the fields of messages and call hashCode on them.

Cheers,
Till

On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <[hidden email]> wrote:
Hi Till,

I found my problem. It was indeed related to a mutable hashcode.

I was using a protobuf message in the key selector function and one of the protobuf fields was enum. I checked the implementation of the hashcode of the generated message and it is using the int value field of the protobuf message so I assumed that it is ok and it's immutable. 

I replaced the key selector function to use Tuple[Long, Int] (since my protobuf message has only these two fields where the int parameter stands for the enum value field). After changing my code to use the Tuple it worked.

I am not sure if Flink somehow reads the protobuf message fields and uses the hashcode of the fields directly since the generated protobuf enum indeed has a mutable hashcode (Enum.hashcode). 

Nevertheless it's ok with the Tuple key. 

Thanks for your response!

Best Regards,
Rado


On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <[hidden email]> wrote:
Hi Rado,

it is hard to tell the reason w/o a bit more details. Could you share with us the complete logs of the problematic run? Also the job you are running and the types of the state you are storing in RocksDB and use as events in your job are very important. In the linked SO question, the problem was a type whose hashcode was not immutable.

Cheers,
Till

On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <[hidden email]> wrote:
Hello all,

I am running a Flink job that performs data enrichment. My job has 7 kafka consumers that receive messages for dml statements performed for 7 db tables.

Job setup:
  • Flink is run in k8s in a similar way as it is described here
  • 1 job manager and 2 task managers 
  • parallelism is set to 4 and 2 task slots
  • rocksdb as state backend
  • protobuf for serialization
Whenever I try to trigger a savepoint after my state is bootstrapped I get the following error for different operators:

Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)

Note: key group might vary.

I found this article in Stackoverflow which relates to such an exception (btw my job graph looks similar to the one described in the article except that my job has more joins). I double checked my hashcodes and I think that they are fine.

I tried to reduce the parallelism to 1 with 1 task slot per task manager and this configuration seems to work. This leads me to a direction that it might be some concurrency issue.

I would like to understand what is causing the savepoint failure. Do you have any suggestions what I might be missing?

Thanks in advance!

Best Regards,
Rado