Flink workers OOM Stream2Batch application

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Flink workers OOM Stream2Batch application

Lopez, Javier
Hi all,

One of our use cases implies to do some Stream2Batch processing. We are using Flink to read from a streaming source and deliver files to S3, after applying some transformation to the stream. These Flink jobs are not running 24/7, they are running on demand and consume a finite number of records. Once the job has consumed all the records, we stop the job. 

We have seen that for this particular use case the memory consumption increases over time, leading after several hours to an instance crash with the error "There is insufficient memory for the Java Runtime Environment to continue.". We have monitored the heap memory and the OS memory. The heap memory usage remains constant over time, but the OS memory keeps increasing until the point it kills the worker. 

We are using other Flink clusters for other use cases, which have jobs running 24/7 for several months without a problem. We checked as well the memory behavior on them and found out that both heap and OS memory usage remain constant. 

We did as well another comparison, we dumped the JVM objects to check how many of them are we keeping in memory in each cluster. We found out that the cluster running this stream2batch jobs keeps almost 5 times more objects than the other cluster which has jobs running 24/7. In 4 hours the stream2batch cluster kept 5 times more objects in memory than the other cluster (which has been processing data constantly for 2 months). 

We did this "short" introduction to our use cases because we believe that there is a problem when we stop the Flink jobs. It looks like the memory is not cleaned when the jobs are stopped. Is this an expected behavior?

BTW, we are using this version: FLINK_VERSION="1.3.0" HADOOP_VERSION="hadoop27" SCALA_VERSION="scala_2.10"
