Hey. We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding. Will this still be true with RocksDB state? Can we put Integer in state without increasing state size? Thanks, Maciej |
Hi Maciej, If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same). With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used. You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression Regards,
Roman On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote:
|
Hey. Let me send simplified example, because I don't think this
"(given that the actual stored objects (integers) are the same)" is true - I'm just storing object as a placeholder: public class DeduplicationProcessFunction<K, IN> extends KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction { private transient ValueState<Object> processedState; public DeduplicationProcessFunction() { } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { } @Override public void initializeState(FunctionInitializationContext context) throws Exception { val descriptor = new ValueStateDescriptor<>("processed", TypeInformation.of(Object.class)); processedState = context.getKeyedStateStore().getState(descriptor); } @Override public void processElement(IN value, Context ctx, Collector<IN> out) throws Exception { val processed = processedState.value(); if (processed == null) { processedState.update(new Object()); out.collect(value); } } } Basically, I'm not sure what rocksdb stores in this case. I'm sure that it needs to store key, which is 32byte sha key in this case. What's the value? Is it the 16 bytes that Java requires in-memory? If I'll change my ValueState to integer, and provide additional value there, will it require more storage space? Also, to respond to your point about compression, we're using incremental checkpoints, so I don't think anything will change as per docs. I'm not only interested in snapshot size, but also size of current, in memory and local disk state. Thanks, Maciej wt., 23 lut 2021 o 17:53 Roman Khachatryan <[hidden email]> napisał(a): > > Hi Maciej, > > If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same). > With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used. > > You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression > > Regards, > Roman > > > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote: >> >> Hey. >> >> We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding. >> Will this still be true with RocksDB state? Can we put Integer in state without increasing state size? >> >> Thanks, Maciej |
Thanks for the clarification. RocksDB stores whatever value Flink passes to it after serialization. The value is passed as an array of bytes so the minimum is single byte. Integer would require 4 bytes, Object - 1 or 2 depending on the serializer (Pojo or Kryo), and boolean just 1 byte. Besides that, boolean serialization is apparently faster. Sizes in memory, on disk and of snapshot are all affected proportionally. You are right regarding Flink compression settings will not have any impact with incremental checkpoints. Regards,
Roman On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski <[hidden email]> wrote: Hey. Let me send simplified example, because I don't think this |
Thanks Roman, that's exactly what I needed.
śr., 24 lut 2021 o 14:37 Roman Khachatryan <[hidden email]> napisał(a): > > Thanks for the clarification. > > RocksDB stores whatever value Flink passes to it after serialization. > The value is passed as an array of bytes so the minimum is single byte. > Integer would require 4 bytes, Object - 1 or 2 depending on the serializer (Pojo or Kryo), and boolean just 1 byte. > Besides that, boolean serialization is apparently faster. > > Sizes in memory, on disk and of snapshot are all affected proportionally. > > You are right regarding Flink compression settings will not have any impact with incremental checkpoints. > > Regards, > Roman > > > On Wed, Feb 24, 2021 at 11:01 AM Maciej Obuchowski <[hidden email]> wrote: >> >> Hey. Let me send simplified example, because I don't think this >> "(given that the actual stored objects (integers) are the same)" is >> true - I'm just storing object as a placeholder: >> >> public class DeduplicationProcessFunction<K, IN> extends >> KeyedProcessFunction<K, IN, IN> implements CheckpointedFunction { >> >> private transient ValueState<Object> processedState; >> >> public DeduplicationProcessFunction() { } >> >> @Override >> public void snapshotState(FunctionSnapshotContext context) throws >> Exception { } >> >> @Override >> public void initializeState(FunctionInitializationContext context) >> throws Exception { >> val descriptor = new ValueStateDescriptor<>("processed", >> TypeInformation.of(Object.class)); >> processedState = context.getKeyedStateStore().getState(descriptor); >> } >> >> @Override >> public void processElement(IN value, Context ctx, Collector<IN> >> out) throws Exception { >> val processed = processedState.value(); >> if (processed == null) { >> processedState.update(new Object()); >> out.collect(value); >> } >> } >> } >> >> >> >> Basically, I'm not sure what rocksdb stores in this case. I'm sure >> that it needs to store key, which is 32byte sha key in this case. >> What's the value? Is it the 16 bytes that Java requires in-memory? If >> I'll change my ValueState to integer, and provide additional value >> there, will it require more storage space? Also, to respond to your >> point about compression, we're using incremental checkpoints, so I >> don't think anything will change as per docs. I'm not only interested >> in snapshot size, but also size of current, in memory and local disk >> state. >> >> Thanks, >> Maciej >> >> >> >> wt., 23 lut 2021 o 17:53 Roman Khachatryan <[hidden email]> napisał(a): >> > >> > Hi Maciej, >> > >> > If I understand correctly, you're asking whether ValueState parameterized with Object has the same size as the one with Integer (given that the actual stored objects (integers) are the same). >> > With RocksDB, any state object is serialized first and only then it is stored in MemTable or in an SST file. So it doesn't matter as long as the same serializer is used. >> > >> > You probably should try enabling compression if you didn't already: https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression >> > >> > Regards, >> > Roman >> > >> > >> > On Tue, Feb 23, 2021 at 12:40 PM Maciej Obuchowski <[hidden email]> wrote: >> >> >> >> Hey. >> >> >> >> We have deduplication job that has a large amount of keyed ValueState. We want to decrease state size as much as possible, so we're using ValueState<Object> as it's smallest possible Java non-primitive. However, as per https://www.baeldung.com/java-size-of-object (and my measurements) Java Integer has the same memory size as Object due to padding. >> >> Will this still be true with RocksDB state? Can we put Integer in state without increasing state size? >> >> >> >> Thanks, Maciej |
Free forum by Nabble | Edit this page |