Hi.
When our jobs are catching up they read with a factor 10-20 times normal rate but then we loose our task managers with OOM. We could increase the memory allocation but is there a way to figure out how high rate we can consume with the current memory and slot allocation and a way to limit the input to avoid OOM Med venlig hilsen / Best regards Lasse Nedergaard |
Hi Lasse,
in order to avoid OOM exception you should analyze your Flink job implementation. Are you creating a lot of objects within your Flink functions? Which state backend are you using? Maybe you can tell us a little bit more about your pipeline? Usually, there should be enough memory for the network buffers and state. Once the processing is not fast enough and the network buffers are filled up the input is limited anyway which results in back-pressure. Regards, Timo Am 21.03.18 um 21:21 schrieb Lasse Nedergaard: > Hi. > > When our jobs are catching up they read with a factor 10-20 times normal rate but then we loose our task managers with OOM. We could increase the memory allocation but is there a way to figure out how high rate we can consume with the current memory and slot allocation and a way to limit the input to avoid OOM > > Med venlig hilsen / Best regards > Lasse Nedergaard |
Hi For sure I can share more info. We run on Flink 1.4.2 (
but have the same problems on 1.3.2 ) on a Aws EMR cluster. 6 taskmanagers on each m4.xlarge slave. Taskmanager heab set to 1850. We use RockStateDbBackend. we have set akka.ask.timeout to 60 s if GC should prevent heatbeat, yarn.maximum-failed-containers to 10000 to have some buffer before we loos our yarn session. One of our jobs reads data from Kinesis as a Json string and map it into a object. Then we do some enrichment over a coPtocessFunction. If we can't find the data in the coprocess stream stream, we make a lookup through a asyncDataStream. Then we merge the 2 stream so that we now have one stream where enrichment has taken place. We then parse the binary data and create new object and output one main stream and 4 sideoutput streams. There should be 1 to 1 in number of objects in this map function. For some of the sideout streams we do additional enrichment before all 5 streams are stored in kinesis. I have now implemented max number of records read from kinesis, and by doing that I can avoid loosing my task manager, but now I can't catch up as fast as I would like. I have only seen back pressure once and that was for another job that use iteration and it never returned from that state. So yes we create objects. I guess we create around 10-20 objects for each input objects and I would like to understand what going on, so I can make an implementation that takes care of it. But is there a way to configure Flink so it will spill to disk instead of OOM. I would prefer a slow system instead of a dead system Please let me know if you need additional information or it don't make any sense. Lasse Nedergaard 2018-03-26 12:29 GMT+02:00 Timo Walther <[hidden email]>: Hi Lasse, |
Free forum by Nabble | Edit this page |