Flink- Heap Space running out

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink- Heap Space running out

Nishant Gupta

 am running a query to join a stream and a table as below. It is running out of heap space. Even though it has enough heap space in flink cluster (60GB * 3)

Is there an eviction strategy needed for this query ?

SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON sourceKafka.CC=DefaulterTable.CC;  

Thanks 

Nishant 

Reply | Threaded
Open this post in threaded view
|

Re: Flink- Heap Space running out

miki haiat
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta <[hidden email]> wrote:

 am running a query to join a stream and a table as below. It is running out of heap space. Even though it has enough heap space in flink cluster (60GB * 3)

Is there an eviction strategy needed for this query ?

SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON sourceKafka.CC=DefaulterTable.CC;  

Thanks 

Nishant 

Reply | Threaded
Open this post in threaded view
|

Re: Flink- Heap Space running out

Fabian Hueske-2
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1] but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table function.

Best, Fabian


Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <[hidden email]>:
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta <[hidden email]> wrote:

 am running a query to join a stream and a table as below. It is running out of heap space. Even though it has enough heap space in flink cluster (60GB * 3)

Is there an eviction strategy needed for this query ?

SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON sourceKafka.CC=DefaulterTable.CC;  

Thanks 

Nishant 

Reply | Threaded
Open this post in threaded view
|

Re: Flink- Heap Space running out

Nishant Gupta

Hi Fabian and Mike

flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB hard disk ]
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

With Idle state retention having below configuration  (Same heap space issue)  
execution:
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 300000
  max-idle-state-retention: 600000  


With time-windowed join (Same heap space issue)
SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND K.k_proctime + INTERVAL '5' MINUTE 

I have tried Temporal functions - It is working fine

I was really wishing to make it work with idle state and time window join. Could you please check the configuration and query. 
Please let me know if any other details are required


On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske <[hidden email]> wrote:
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1] but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table function.

Best, Fabian


Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <[hidden email]>:
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta <[hidden email]> wrote:

 am running a query to join a stream and a table as below. It is running out of heap space. Even though it has enough heap space in flink cluster (60GB * 3)

Is there an eviction strategy needed for this query ?

SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON sourceKafka.CC=DefaulterTable.CC;  

Thanks 

Nishant 

Reply | Threaded
Open this post in threaded view
|

Re: Flink- Heap Space running out

Nishant Gupta
Appoligies ****correction done to previous email****

Hi Fabian and Mike

flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB hard disk ]
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

With Idle state retention having below configuration  (Same heap space issue)  
execution:
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 300000
  max-idle-state-retention: 600000  


With time-windowed join (Records gets missed out and duplicated based on the timeinterval I push badips)
SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND K.k_proctime + INTERVAL '5' MINUTE 

I have tried Temporal functions - It is working fine

I was really wishing to make it work with idle state and time window join. Could you please check the configuration and query. 
Please let me know if any other details are required

On Fri, Sep 27, 2019 at 12:41 PM Nishant Gupta <[hidden email]> wrote:

Hi Fabian and Mike

flink-conf.yaml  [In a 3 node cluster having 120 GB memory each and 3 TB hard disk ]
jobmanager.heap.size: 50120m
taskmanager.heap.size: 50120m

With Idle state retention having below configuration  (Same heap space issue)  
execution:
  planner: old
  type: streaming
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 300000
  max-idle-state-retention: 600000  


With time-windowed join (Same heap space issue)
SELECT K.* FROM KafkaSource AS K, BadIP AS B WHERE K.sourceip = B.bad_ip AND B.b_proctime BETWEEN K.k_proctime - INTERVAL '20' MINUTE AND K.k_proctime + INTERVAL '5' MINUTE 

I have tried Temporal functions - It is working fine

I was really wishing to make it work with idle state and time window join. Could you please check the configuration and query. 
Please let me know if any other details are required


On Thu, Sep 26, 2019 at 8:46 PM Fabian Hueske <[hidden email]> wrote:
Hi,

I don' think that the memory configuration is the issue.
The problem is the join query. The join does not have any temporal boundaries.
Therefore, both tables are completely stored in memory and never released.

You can configure a memory eviction strategy via idle state retention [1] but you should make sure that this is really what you want.
Alternatively, try a time-windowed join or a join with a temporal table function.

Best, Fabian


Am Do., 26. Sept. 2019 um 17:08 Uhr schrieb miki haiat <[hidden email]>:
You can configure the task manager memory in the config.yaml file.
What is the current configuration?

On Thu, Sep 26, 2019, 17:14 Nishant Gupta <[hidden email]> wrote:

 am running a query to join a stream and a table as below. It is running out of heap space. Even though it has enough heap space in flink cluster (60GB * 3)

Is there an eviction strategy needed for this query ?

SELECT sourceKafka.* FROM sourceKafka INNER JOIN DefaulterTable ON sourceKafka.CC=DefaulterTable.CC;  

Thanks 

Nishant