In running tests of flink jobs we are seeing some that yield really good performance (2.5M records in minutes) and others that are struggleing to get past 200k records processed. In the later case there are a large number of keys, and each key gets state in the form of 3 value states. One holds a string and the others hold a Map of Lists of events (JSONObject object subclasses with custom java serialization). There is also a MapState for each key that will hold one entry for each event matching that key (string->string).
The program starts out processing 1000-1500 records/sec (on my 4 year old laptop), and progressively gets slower and slower. it is about 400/sec when processing the 500,000th event. When using JProfiler on the test (local environment running under Eclipse) it indicates 70-80% of the execution time is spent in Kryo serialization methods. When using the MemoryStateBackend the above is true, the RocksDBStateBackend is about 1/2 to 2/3 the speed. Any suggestions on how to reduce or identify the source of the serialization performance issue is welcome. Michael |
Hi Michael,
Things I could suggest are: * first of all KryoSerialization is sort of a fallback serialization that is used if there is no better suited serialization that can be used, as you said you are using some complex JSONObjects I would recommend writing your own org.apache.flink.api.common.typeutils.TypeSerializer for them. * you could try to reduce the number of events you keep in state, make sure you remove elements you no longer need * I would revisit the map of lists of JSONObject structure as you have to de/serialize the whole list on each access. Maybe you could implement it with e.g. two maps? first map keeping just indices to JSONObjects in the other Map? This way you won't have to deserialize whole lists of complex objects. * also make sure the performance bottleneck is while accessing the state and not while forwarding events, check the performance for a job with same logic and events, but without state. Hope those pointers will help improve your job's performance. Best, Dawid On 31/10/2018 23:58, TechnoMage wrote: > In running tests of flink jobs we are seeing some that yield really good performance (2.5M records in minutes) and others that are struggleing to get past 200k records processed. In the later case there are a large number of keys, and each key gets state in the form of 3 value states. One holds a string and the others hold a Map of Lists of events (JSONObject object subclasses with custom java serialization). There is also a MapState for each key that will hold one entry for each event matching that key (string->string). > > The program starts out processing 1000-1500 records/sec (on my 4 year old laptop), and progressively gets slower and slower. it is about 400/sec when processing the 500,000th event. > > When using JProfiler on the test (local environment running under Eclipse) it indicates 70-80% of the execution time is spent in Kryo serialization methods. > > When using the MemoryStateBackend the above is true, the RocksDBStateBackend is about 1/2 to 2/3 the speed. > > Any suggestions on how to reduce or identify the source of the serialization performance issue is welcome. > > Michael signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |