Object and Integer size in RocksDB ValueState

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Object and Integer size in RocksDB ValueState

Maciej Obuchowski
Hey.

We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding.
Will this still be true with RocksDB state? Can we put Integer in state without increasing state size?

Thanks, Maciej
Reply | Threaded
Open this post in threaded view
|

Re: Object and Integer size in RocksDB ValueState

Roman Khachatryan
Hi Maciej,

If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same).
With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used. 

You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

Regards,
Roman


On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote:
Hey.

We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding.
Will this still be true with RocksDB state? Can we put Integer in state without increasing state size?

Thanks, Maciej
Reply | Threaded
Open this post in threaded view
|

Re: Object and Integer size in RocksDB ValueState

Maciej Obuchowski
Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is
true - I'm just storing object as a placeholder:

public class DeduplicationProcessFunction<K, IN> extends
KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction {

    private transient ValueState<Object> processedState;

    public DeduplicationProcessFunction() { }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws
Exception { }

    @Override
    public void initializeState(FunctionInitializationContext context)
throws Exception {
        val descriptor = new ValueStateDescriptor<>("processed",
            TypeInformation.of(Object.class));
        processedState = context.getKeyedStateStore().getState(descriptor);
    }

    @Override
    public void processElement(IN value, Context ctx, Collector<IN>
out) throws Exception {
        val processed = processedState.value();
        if (processed == null) {
            processedState.update(new Object());
            out.collect(value);
        }
    }
}



Basically, I'm not sure what rocksdb stores in this case. I'm sure
that it needs to store key, which is 32byte sha key in this case.
What's the value? Is it the 16 bytes that Java requires in-memory? If
I'll change my ValueState to integer, and provide additional value
there, will it require more storage space? Also, to respond to your
point about compression, we're using incremental checkpoints, so I
don't think anything will change as per docs. I'm not only interested
in snapshot size, but also size of current, in memory and local disk
state.

Thanks,
Maciej



wt., 23 lut 2021 o 17:53 Roman Khachatryan <[hidden email]> napisał(a):

>
> Hi Maciej,
>
> If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same).
> With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used.
>
> You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote:
>>
>> Hey.
>>
>> We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding.
>> Will this still be true with RocksDB state? Can we put Integer in state without increasing state size?
>>
>> Thanks, Maciej
Reply | Threaded
Open this post in threaded view
|

Re: Object and Integer size in RocksDB ValueState

Roman Khachatryan
Thanks for the clarification.

RocksDB stores whatever value Flink passes to it after serialization.
The value is passed as an array of bytes so the minimum is single byte.
Integer would require 4 bytes, Object - 1 or 2 depending on the serializer (Pojo or Kryo), and boolean just 1 byte.
Besides that, boolean serialization is apparently faster.

Sizes in memory, on disk and of snapshot are all affected proportionally.

You are right regarding Flink compression settings will not have any impact with incremental checkpoints.

Regards,
Roman


On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski <[hidden email]> wrote:
Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is
true - I'm just storing object as a placeholder:

public class DeduplicationProcessFunction<K, IN> extends
KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction {

    private transient ValueState<Object> processedState;

    public DeduplicationProcessFunction() { }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws
Exception { }

    @Override
    public void initializeState(FunctionInitializationContext context)
throws Exception {
        val descriptor = new ValueStateDescriptor<>("processed",
            TypeInformation.of(Object.class));
        processedState = context.getKeyedStateStore().getState(descriptor);
    }

    @Override
    public void processElement(IN value, Context ctx, Collector<IN>
out) throws Exception {
        val processed = processedState.value();
        if (processed == null) {
            processedState.update(new Object());
            out.collect(value);
        }
    }
}



Basically, I'm not sure what rocksdb stores in this case. I'm sure
that it needs to store key, which is 32byte sha key in this case.
What's the value? Is it the 16 bytes that Java requires in-memory? If
I'll change my ValueState to integer, and provide additional value
there, will it require more storage space? Also, to respond to your
point about compression, we're using incremental checkpoints, so I
don't think anything will change as per docs. I'm not only interested
in snapshot size, but also size of current, in memory and local disk
state.

Thanks,
Maciej



wt., 23 lut 2021 o 17:53 Roman Khachatryan <[hidden email]> napisał(a):
>
> Hi Maciej,
>
> If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same).
> With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used.
>
> You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote:
>>
>> Hey.
>>
>> We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding.
>> Will this still be true with RocksDB state? Can we put Integer in state without increasing state size?
>>
>> Thanks, Maciej
Reply | Threaded
Open this post in threaded view
|

Re: Object and Integer size in RocksDB ValueState

Maciej Obuchowski
Thanks Roman, that's exactly what I needed.

śr., 24 lut 2021 o 14:37 Roman Khachatryan <[hidden email]> napisał(a):

>
> Thanks for the clarification.
>
> RocksDB stores whatever value Flink passes to it after serialization.
> The value is passed as an array of bytes so the minimum is single byte.
> Integer would require 4 bytes, Object - 1 or 2 depending on the serializer (Pojo or Kryo), and boolean just 1 byte.
> Besides that, boolean serialization is apparently faster.
>
> Sizes in memory, on disk and of snapshot are all affected proportionally.
>
> You are right regarding Flink compression settings will not have any impact with incremental checkpoints.
>
> Regards,
> Roman
>
>
> On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski <[hidden email]> wrote:
>>
>> Hey. Let me send simplified example, because I don't think this
>> "(given that the actual stored objects (integers) are the same)" is
>> true - I'm just storing object as a placeholder:
>>
>> public class DeduplicationProcessFunction<K, IN> extends
>> KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction {
>>
>>     private transient ValueState<Object> processedState;
>>
>>     public DeduplicationProcessFunction() { }
>>
>>     @Override
>>     public void snapshotState(FunctionSnapshotContext context) throws
>> Exception { }
>>
>>     @Override
>>     public void initializeState(FunctionInitializationContext context)
>> throws Exception {
>>         val descriptor = new ValueStateDescriptor<>("processed",
>>             TypeInformation.of(Object.class));
>>         processedState = context.getKeyedStateStore().getState(descriptor);
>>     }
>>
>>     @Override
>>     public void processElement(IN value, Context ctx, Collector<IN>
>> out) throws Exception {
>>         val processed = processedState.value();
>>         if (processed == null) {
>>             processedState.update(new Object());
>>             out.collect(value);
>>         }
>>     }
>> }
>>
>>
>>
>> Basically, I'm not sure what rocksdb stores in this case. I'm sure
>> that it needs to store key, which is 32byte sha key in this case.
>> What's the value? Is it the 16 bytes that Java requires in-memory? If
>> I'll change my ValueState to integer, and provide additional value
>> there, will it require more storage space? Also, to respond to your
>> point about compression, we're using incremental checkpoints, so I
>> don't think anything will change as per docs. I'm not only interested
>> in snapshot size, but also size of current, in memory and local disk
>> state.
>>
>> Thanks,
>> Maciej
>>
>>
>>
>> wt., 23 lut 2021 o 17:53 Roman Khachatryan <[hidden email]> napisał(a):
>> >
>> > Hi Maciej,
>> >
>> > If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same).
>> > With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used.
>> >
>> > You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression
>> >
>> > Regards,
>> > Roman
>> >
>> >
>> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote:
>> >>
>> >> Hey.
>> >>
>> >> We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding.
>> >> Will this still be true with RocksDB state? Can we put Integer in state without increasing state size?
>> >>
>> >> Thanks, Maciej