Heap memory not released after aggregation operator

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Heap memory not released after aggregation operator

Shashank Timmarajus
Hello all,

I am running a flink streaming job which consumes messages from Kafka and writes to S3 after performing the aggregation on source records. Something like below:





My TargetRecord is an ArrayList of byte arrays and they accumulate on the memory over time(after 4 days of continuous run). The below is the heap analysis taken on a machine which is taken before crash and 6GB/8GB is occupied by the byte array ArrayList. 
Is there anything wrong I am doing here like passing my aggregate result to an async function and map function and then to sink. 

Thanks for your time, much appreciated. 


--
Best Regards
Shashank

Reply | Threaded
Open this post in threaded view
|

Re: Heap memory not released after aggregation operator

Fabian Hueske-2
Hi,

in a WindowedStream it is the responsibility of the Trigger to purge the window state at some point in time. Otherwise, the window operator accumulates data.
In your code snippet, you define a custom trigger but the call to use it is commented out.

The built-in trigger of a TimeWIndow should correctly clean the state of a window when the watermark passes the window end boundary.
If your program fails with the custom trigger, I would have a closer look at its implementation.

Best, Fabian

2017-10-25 21:34 GMT+02:00 Shashank Timmarajus <[hidden email]>:
Hello all,

I am running a flink streaming job which consumes messages from Kafka and writes to S3 after performing the aggregation on source records. Something like below:





My TargetRecord is an ArrayList of byte arrays and they accumulate on the memory over time(after 4 days of continuous run). The below is the heap analysis taken on a machine which is taken before crash and 6GB/8GB is occupied by the byte array ArrayList. 
Is there anything wrong I am doing here like passing my aggregate result to an async function and map function and then to sink. 

Thanks for your time, much appreciated. 


--
Best Regards
Shashank