Rocksdb Serialization issue

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Rocksdb Serialization issue

aldu29
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this error.

java.lang.ArrayIndexOutOfBoundsException: 512
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2


Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb Serialization issue

Arvid Heise-3
Hi David,

the obvious reason is that your state stored an enum value that is not present anymore. It tries to deserialize the 512. entry in your enum that is not available.

However, since it's highly unlikely that you actually have that many enum values in the same enum class, we are actually looking at a corrupt stream, which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If it's application, it may be that state is incompatible and would need to be migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin <[hidden email]> wrote:
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this error.

java.lang.ArrayIndexOutOfBoundsException: 512
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2


Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb Serialization issue

aldu29
Hello Arvid,

After some investigations with the help of my colleague we finally found the root cause.
In order to improve the init of the state, I've created some threads to parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State Processor API.
Here after an abstract of the code:

ExecutorService executorService = Executors.newFixedThreadPool(initStateMaxThreads);
for (FileStatus bucketFile : xxx) {
    executorService.submit(
            () -> {
        try {
            readBucketFct(XXX); // Update the state with the bucket content...
        } catch (Exception e) {
           ....
        }
    });
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(initStateTimeoutSeconds, TimeUnit.SECONDS);
if ((!terminated) || (readMetaErrors.get() > 0)) {
    throw new SinkException("Init state failed...") ;
}


After some tests: if I use one thead in my executorService it works. But with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise <[hidden email]> a écrit :
Hi David,

the obvious reason is that your state stored an enum value that is not present anymore. It tries to deserialize the 512. entry in your enum that is not available.

However, since it's highly unlikely that you actually have that many enum values in the same enum class, we are actually looking at a corrupt stream, which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If it's application, it may be that state is incompatible and would need to be migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin <[hidden email]> wrote:
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this error.

java.lang.ArrayIndexOutOfBoundsException: 512
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2


Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb Serialization issue

Arvid Heise-3
Hi David,

could you please explain what you are actually trying to achieve?

It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading the same file on multiple parallel sinks?
Is your sink running in a keyed context? Maybe even provide the general pipeline.

On Thu, Mar 5, 2020 at 2:29 PM David Morin <[hidden email]> wrote:
Hello Arvid,

After some investigations with the help of my colleague we finally found the root cause.
In order to improve the init of the state, I've created some threads to parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State Processor API.
Here after an abstract of the code:

ExecutorService executorService = Executors.newFixedThreadPool(initStateMaxThreads);
for (FileStatus bucketFile : xxx) {
    executorService.submit(
            () -> {
        try {
            readBucketFct(XXX); // Update the state with the bucket content...
        } catch (Exception e) {
           ....
        }
    });
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(initStateTimeoutSeconds, TimeUnit.SECONDS);
if ((!terminated) || (readMetaErrors.get() > 0)) {
    throw new SinkException("Init state failed...") ;
}


After some tests: if I use one thead in my executorService it works. But with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise <[hidden email]> a écrit :
Hi David,

the obvious reason is that your state stored an enum value that is not present anymore. It tries to deserialize the 512. entry in your enum that is not available.

However, since it's highly unlikely that you actually have that many enum values in the same enum class, we are actually looking at a corrupt stream, which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If it's application, it may be that state is incompatible and would need to be migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin <[hidden email]> wrote:
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this error.

java.lang.ArrayIndexOutOfBoundsException: 512
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2


Reply | Threaded
Open this post in threaded view
|

Re: Rocksdb Serialization issue

aldu29
Yes Arvid, the Sink is keyed by a String dbName::tableName
This is kafka as input but to init the state we have to read Hive delta files febore consume kafka records. This is ORC files we have to read to init the state with one directory per table.
A key (primary key) is only in one bucket file. So to init the state per table (per keyedstream in fact) we've created a pool of threads to read more than one bucket file in parallel.
This task is performed when the first record for one table is read from kafka and if the state for this table does not exist (So this code is in the process method)
Then, we can snapshot the state and reuse it but this task of init must be done at least once.
We can have more than one instances of Sink but each task is in one JVM and we can't have more than one task for one table (keystream) at the moment. A sharding has been developed but not yet tested.
We use yarn session and we specified the --slots option to force one task per taskmanager because we used a lib (dependency) not thread safe.
So if I'm right we can't read the same bucket file on multiple parallel sinks at the moment.
But yes to make this task of state init per table faster, I've naively created this pool of threads.
If I can keep this as a workaround it would be great (in waiting a better solution: sharding, State processor API, ...)
I'm open to any suggestion for the short or the long term.

Thanks

Le jeu. 5 mars 2020 à 14:35, Arvid Heise <[hidden email]> a écrit :
Hi David,

could you please explain what you are actually trying to achieve?

It seems like you are reading in the SinkFunction#open some files from S3 and put it into state (bootstrapping?)
How many instances of the sink are executed?
How do you shard the buckets / e.g. how do you avoid reading the same file on multiple parallel sinks?
Is your sink running in a keyed context? Maybe even provide the general pipeline.

On Thu, Mar 5, 2020 at 2:29 PM David Morin <[hidden email]> wrote:
Hello Arvid,

After some investigations with the help of my colleague we finally found the root cause.
In order to improve the init of the state, I've created some threads to parallelize the read of bucket files.
This is a temporary solution because I've planned to use the State Processor API.
Here after an abstract of the code:

ExecutorService executorService = Executors.newFixedThreadPool(initStateMaxThreads);
for (FileStatus bucketFile : xxx) {
    executorService.submit(
            () -> {
        try {
            readBucketFct(XXX); // Update the state with the bucket content...
        } catch (Exception e) {
           ....
        }
    });
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(initStateTimeoutSeconds, TimeUnit.SECONDS);
if ((!terminated) || (readMetaErrors.get() > 0)) {
    throw new SinkException("Init state failed...") ;
}


After some tests: if I use one thead in my executorService it works. But with 2 threads the job fails.
Can I mitigate this behaviour (in waiting the switch to the State Processor API) ?

Thanks
David


Le jeu. 5 mars 2020 à 08:06, Arvid Heise <[hidden email]> a écrit :
Hi David,

the obvious reason is that your state stored an enum value that is not present anymore. It tries to deserialize the 512. entry in your enum that is not available.

However, since it's highly unlikely that you actually have that many enum values in the same enum class, we are actually looking at a corrupt stream, which is hard to fix. Could you describe which state you have?

Did you upgrade Flink or your application? If it's Flink, it's a bug. If it's application, it may be that state is incompatible and would need to be migrated.

Did you restart from checkpoint or savepoint?

On Thu, Mar 5, 2020 at 1:14 AM David Morin <[hidden email]> wrote:
Hello,

I have this Exception in my datastream app and I can't find the root cause.
I consume data from Kafka and it fails when I try to get a value from my MapState in RocksDB.
It was working in previous release of my app but I can't find the cause of this error.

java.lang.ArrayIndexOutOfBoundsException: 512
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:130)
        at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:50)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:143)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:344)
        at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
        at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
..

Flink version: 1.9.2