batch job OOM

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

batch job OOM

Fanbin Bu
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Jingsong Li
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Jingsong Li
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
In reply to this post by Fanbin Bu

I saw the doc in https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/config.html.
Do i have to set that in the code or can i do it through flink-conf.yaml?

On Wed, Jan 22, 2020 at 7:54 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
In reply to this post by Jingsong Li
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Jingsong Li
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Jingsong Li
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

phoenixjiangnan
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li <[hidden email]> wrote:
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
I can build flink 1.10 and install it on to EMR (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li <[hidden email]> wrote:
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li <[hidden email]> wrote:
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Arvid Heise-3
Hi Fanbin,

you could use the RC1 of Flink that was created yesterday and use the apache repo https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/ .
Alternatively, if you build Flink locally with `mvn install`, then you could use mavenLocal() in your gradle.build and feed from that.

Best,

Arvid

On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu <[hidden email]> wrote:
I can build flink 1.10 and install it on to EMR (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li <[hidden email]> wrote:
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li <[hidden email]> wrote:
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Fanbin Bu
Jingsong,

I created https://issues.apache.org/jira/browse/FLINK-15928 to track the issue. Let me know if you need anything else to debug.

Thanks,
Fanbin


On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise <[hidden email]> wrote:
Hi Fanbin,

you could use the RC1 of Flink that was created yesterday and use the apache repo https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/ .
Alternatively, if you build Flink locally with `mvn install`, then you could use mavenLocal() in your gradle.build and feed from that.

Best,

Arvid

On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu <[hidden email]> wrote:
I can build flink 1.10 and install it on to EMR (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li <[hidden email]> wrote:
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li <[hidden email]> wrote:
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: batch job OOM

Jingsong Li
Thanks Fanbin,

I will try to find the bug, and track it.

Best,
Jingsong Lee

On Thu, Feb 6, 2020 at 7:50 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I created https://issues.apache.org/jira/browse/FLINK-15928 to track the issue. Let me know if you need anything else to debug.

Thanks,
Fanbin


On Tue, Jan 28, 2020 at 12:54 AM Arvid Heise <[hidden email]> wrote:
Hi Fanbin,

you could use the RC1 of Flink that was created yesterday and use the apache repo https://repository.apache.org/content/repositories/orgapacheflink-1325/org/apache/flink/flink-json/1.10.0/ .
Alternatively, if you build Flink locally with `mvn install`, then you could use mavenLocal() in your gradle.build and feed from that.

Best,

Arvid

On Tue, Jan 28, 2020 at 1:24 AM Fanbin Bu <[hidden email]> wrote:
I can build flink 1.10 and install it on to EMR (flink-dist_2.11-1.10.0.jar). but what about other dependencies in my project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li <[hidden email]> wrote:
Hi Fanbin,

You can install your own Flink build in AWS EMR, and it frees you from Emr’s release cycles

On Thu, Jan 23, 2020 at 03:36 Jingsong Li <[hidden email]> wrote:
Fanbin,

I have no idea now, can you created a JIRA to track it? You can describe complete SQL and some data informations.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Do you have any suggestions to debug the above mentioned IndexOutOfBoundsException error?
Thanks,

Fanbin

On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu <[hidden email]> wrote:
I got the following error when running another job. any suggestions?

Caused by: java.lang.IndexOutOfBoundsException
at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
at HashWinAggWithKeys$538.endInput(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

I set the config value to be too large. After I changed it to a smaller number it works now!
thanks you for the help. really appreciate it!

Fanbin

On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

Looks like your config is wrong, can you show your config code?

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Great, now i got a different error:

java.lang.NullPointerException: Initial Segment may not be null
	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:65)
	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:522)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:190)
	at org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:149)
	at LocalHashWinAggWithKeys$292.open(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)

is there any other config i should add?
thanks,
Fanbin

On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu <[hidden email]> wrote:
you beat me to it.
let's me try that.

On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li <[hidden email]> wrote:
Fanbin,

NOTE: you need configure this into TableConfig.

Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu <[hidden email]> wrote:
Jingsong,

Thank you for the response.
Since I'm using flink on EMR and the latest version is 1.9 now. the second option is ruled out. but will keep that in mind for future upgrade.

I'm going to try the first option. It's probably a good idea to add that in the doc for example: https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

Thanks,
Fanbin

On Wed, Jan 22, 2020 at 7:08 PM Jingsong Li <[hidden email]> wrote:
Hi Fanbin,

Thanks for using blink batch mode.

The OOM is caused by the manage memory not enough in Hash aggregation.

There are three options you can choose from:

1.Is your version Flink 1.9? 1.9 still use fix memory configuration. So you need increase hash memory:
- table.exec.resource.hash-agg.memory: 1024 mb

2.In 1.10, we use slot manage memory to dynamic config real operator memory, so operator can use more manage memory, so you don't need configure hash agg memory anymore. You can try 1.10 RC0 [1]

3.We can use sort aggregation to avoid OOM too, but there is no config option now, I created JIRA to track it. [2]


Best,
Jingsong Lee

On Thu, Jan 23, 2020 at 7:40 AM Fanbin Bu <[hidden email]> wrote:

tried to increase memory: flink run -m yarn-cluster -p 16 -ys 1 -ytm 200000 -yjm 8096 myjar
and still got the same OOM exception.
my sql is like:
select id, hop_end(created_at, interval '30' second, interval '1' minute), sum(field)... #20 of these sums
from table group by id, hop(created_at, interval '30' second, interval '1' minute)


On Wed, Jan 22, 2020 at 3:19 PM Fanbin Bu <[hidden email]> wrote:
Hi,

I have a batch job using blink planner. and got the following error. I was able to successfully run the same job with flink 1.8 on yarn.

I set conf as:
taskmanager.heap.size: 50000m

and flink UI gives me
Last Heartbeat:20-01-22 14:56:25ID:container_1579720108062_0018_01_000020Data Port:41029Free Slots / All Slots:1 / 0CPU Cores:16Physical Memory:62.9 GBJVM Heap Size:10.3 GBFlink Managed Memory:24.9 GB

any suggestions on how to move forward?
Thanks,
Fanbin

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 25 more
Caused by: java.io.IOException: Hash window aggregate map OOM.
at HashWinAggWithKeys$534.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee


--
Best, Jingsong Lee