The application consumes from a single Kafka topic, deserializes the
JSON payload into POJOs and use a big keyed window (30+ days) for deduplication, then emits the result for every single event to four other keyed windows for aggregation. It looks roughly like the following. Source->KeyBy(A,B,C) | | -->KeyBy(A,B,C)->Hourly Window(sum) v |->KeyBy(A,B,C)->Daily Window(sum) Big Window(sum, emit per event) -| |->KeyBy(D,E)->Hourly Window(sum) -->KeyBy(D,E)->Daily Window(sum) The cardinality for the window keyed on (A,B,C) is high, could be in the millions. The values (A,B,C) are all strings. I'm doing performance testing by letting the application consuming the past 7 days data from Kafka. However, the performance is not good and I'm having some trouble interpreting the results. All tests were done on AWS using i3.xlarge with 2 slots per TM. This was tested with one, three, and six TMs. Parallelism was set to the same as the total number of slots available, e.g. 6 for 3 nodes with 2 slots per TM. - The application would always start at consuming ~500 messages/s from Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I noticed that the disk I/O would reduce noticeable when the performance jumped. - Regardless of the number of TMs used, it always peaked at ~5,000 messages/s and had the same behavior as described above. - In the Flink UI, it always shows that the Source was back pressured by the Big window when the performance was at ~500 messages/s, and no back pressure at all once the performance reaches ~5,000 messages/s. - I took some Flight Recorder recordings and it showed that the time trigger Big window thread was always doing SystemProcessingTimeService$TriggerTask.run(). Since I'm only triggering the Big window by count of events, why would this be running? - Flight Recorder also showed that the Big window thread was either doing RocksDB writes or gets most of the time when the performance was low. I understand that it keeps the states in RocksDB, but I wasn't expecting it to tank the performance like this. - Flight Recorder showed that the hottest methods were all about Kryo serialization. - GC was ok, nothing longer than 20ms and there weren't a lot of them. My questions are - Why is the performance so bad and why didn't it scale as I increase the number of TMs. - Why would the performance jump suddenly after 20 minutes or so? - I know the JSON and POJO serialization is not great. Could it be this bad? Any insights or guidance on how I can diagnose the issue further will be greatly appreciated. Thanks, Ning |
If you have a window larger than hours then you need to rethink your architecture - this is not streaming anymore. Only because you receive events in a streamed fashion you don’t need to do all the processing in a streamed fashion.
Can you store the events in a file or a database and then do after 30 days batch processing on them? Another aspect could be also to investigate why your source sends duplicated entries. > On 27. Aug 2018, at 04:30, Ning Shi <[hidden email]> wrote: > > The application consumes from a single Kafka topic, deserializes the > JSON payload into POJOs and use a big keyed window (30+ days) for > deduplication, then emits the result for every single event to four > other keyed windows for aggregation. It looks roughly like the > following. > > Source->KeyBy(A,B,C) > | > | -->KeyBy(A,B,C)->Hourly Window(sum) > v |->KeyBy(A,B,C)->Daily Window(sum) > Big Window(sum, emit per event) -| > |->KeyBy(D,E)->Hourly Window(sum) > -->KeyBy(D,E)->Daily Window(sum) > > The cardinality for the window keyed on (A,B,C) is high, could be in the > millions. The values (A,B,C) are all strings. > > I'm doing performance testing by letting the application consuming the > past 7 days data from Kafka. However, the performance is not good and > I'm having some trouble interpreting the results. All tests were done on > AWS using i3.xlarge with 2 slots per TM. This was tested with one, > three, and six TMs. Parallelism was set to the same as the total number > of slots available, e.g. 6 for 3 nodes with 2 slots per TM. > > - The application would always start at consuming ~500 messages/s from > Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I > noticed that the disk I/O would reduce noticeable when the performance > jumped. > > - Regardless of the number of TMs used, it always peaked at ~5,000 > messages/s and had the same behavior as described above. > > - In the Flink UI, it always shows that the Source was back pressured by > the Big window when the performance was at ~500 messages/s, and no > back pressure at all once the performance reaches ~5,000 messages/s. > > - I took some Flight Recorder recordings and it showed that the time > trigger Big window thread was always doing > SystemProcessingTimeService$TriggerTask.run(). Since I'm only > triggering the Big window by count of events, why would this be > running? > > - Flight Recorder also showed that the Big window thread was either > doing RocksDB writes or gets most of the time when the performance was > low. I understand that it keeps the states in RocksDB, but I wasn't > expecting it to tank the performance like this. > > - Flight Recorder showed that the hottest methods were all about Kryo > serialization. > > - GC was ok, nothing longer than 20ms and there weren't a lot of them. > > My questions are > > - Why is the performance so bad and why didn't it scale as I increase > the number of TMs. > > - Why would the performance jump suddenly after 20 minutes or so? > > - I know the JSON and POJO serialization is not great. Could it be this > bad? > > Any insights or guidance on how I can diagnose the issue further will be > greatly appreciated. > > Thanks, > > Ning |
> If you have a window larger than hours then you need to rethink your architecture - this is not streaming anymore. Only because you receive events in a streamed fashion you don’t need to do all the processing in a streamed fashion.
Thanks for the thoughts, I’ll keep that in mind. However, in the test, it was not storing more than two days worth of data yet. I’m very much interested in understanding the root cause of the low performance before moving on to do major restructuring. > Can you store the events in a file or a database and then do after 30 days batch processing on them? The 30 day window is just used for deduplication, but it triggers for every event and sends the result out to downstream so that we can still get real-time analytics on the events. > Another aspect could be also to investigate why your source sends duplicated entries. They are not 100% duplicate events syntactically. The events are only duplicates from a logical sense. For example, the same person doing the same action multiple times at different time of day. Ning |
Hi Ning, could you replace the Kafka Source by a custom SourceFunction-implementation, which just produces the new events in a loop as fast as possible. This way we can rule out that the ingestion is responsible for the performance jump or the limit at 5000 events/s and can benchmark the Flink job separately. Kryo is much less efficient than Flinks POJO serializer. In the logs you should see INFO logs about the classes for which Flink falls back to Kryo. Try to replace those by Flink POJO, i.e. default constructor and public getters and setters. As Flink also needs to serialize/deserialize each state object for reading and writing this also applies to your state classes, not only to your events. The lines you are looking for are "INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class com.company.YourClass so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.". You can disable the Kryo fallback temporarily as descried in [1]. I would suspect that serialization is a big factor right now, in particular if you see Kryo methods taking a lot of time. Best, Konstantin |
Hi Konstantin,
> could you replace the Kafka Source by a custom SourceFunction-implementation, which just produces the new events in a loop as fast as possible. This way we can rule out that the ingestion is responsible for the performance jump or the limit at 5000 events/s and can benchmark the Flink job separately. We built a custom source function and figured out the reason for the sudden performance jump. It was caused by the wrong watermarks in the original Kafka stream. Certain events created watermarks in the future. As soon as that happened, all subsequent events were dropped. Hence the throughput increase. > Kryo is much less efficient than Flinks POJO serializer. In the logs you should see INFO logs about the classes for which Flink falls back to Kryo. Try to replace those by Flink POJO, i.e. default constructor and public getters and setters. As Flink also needs to serialize/deserialize each state object for reading and writing this also applies to your state classes, not only to your events. The lines you are looking for are "INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class com.company.YourClass so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.". You can disable the Kryo fallback temporarily as descried in [1]. I would suspect that serialization is a big factor right now, in particular if you see Kryo methods taking a lot of time. This was very helpful. We found a couple of key objects being serialized using Kryo. After fixing them and making sure that they were properly serialized as POJOs, the performance almost doubled from 500 events/s. Thank you a lot for the advices, Ning |
Free forum by Nabble | Edit this page |