Debug OutOfMemory

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Debug OutOfMemory

Kostiantyn Kudriavtsev
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev
Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Stephan Ewen
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev

Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Fabian Hueske-2
In reply to this post by Kostiantyn Kudriavtsev
Hi Konstantin,

Flink uses managed memory only for its internal processing (sorting, hash tables, etc.).
If you allocate too much memory in your user code, it can still fail with an OOME. This can also happen for large broadcast sets.
Can you check how much much memory the JVM allocated and how much was allocated as managed memory. You'll find that information in the task manager log file.

Cheers,
Fabian

2015-10-08 17:58 GMT+02:00 KOSTIANTYN Kudriavtsev <[hidden email]>:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev

Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Kostiantyn Kudriavtsev
In reply to this post by Stephan Ewen

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev

Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Stephan Ewen
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev


Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Kostiantyn Kudriavtsev
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev



Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Kostiantyn Kudriavtsev
What's confuse me, I'm running Flink on yarn with the following command:  ./yarn-session.sh -n 4 -jm 2096 -tm 5000
so I expect to have TaskManager with almost 5GB ram available, but
 taskmanager manel I found that each task manager has the following conf:
Flink Managed Memory: 2460 mb 
CPU cores: 4 
Physical Memory 15046 mb
  and stats:
Memory.heap.used
Current: 248MAvg: 246M
Memory.flink.used
Current: 2GAvg: 2G

 in UI on configuration panel I found:
taskmanager.heap.mb512

obmanager.heap.mb256



Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev




Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Stephan Ewen
In reply to this post by Kostiantyn Kudriavtsev
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev




Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Kostiantyn Kudriavtsev
Hm, you was write

I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. 

Thanks for help!

Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <[hidden email]> wrote:
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev





Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Stephan Ewen
There is probably a different CSV input format implementation which drops invalid lines (too long lines).

Is that actually desired behavior, simply dropping malformatted input?

On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hm, you was write

I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. 

Thanks for help!

Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <[hidden email]> wrote:
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev






Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Kostiantyn Kudriavtsev

Yes it's,
I'm checking number of columns per line to filter out mailformed

*Sent from my ZenFone

On Oct 8, 2015 1:19 PM, "Stephan Ewen" <[hidden email]> wrote:
There is probably a different CSV input format implementation which drops invalid lines (too long lines).

Is that actually desired behavior, simply dropping malformatted input?

On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hm, you was write

I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. 

Thanks for help!

Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <[hidden email]> wrote:
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev






Reply | Threaded
Open this post in threaded view
|

Re: Debug OutOfMemory

Fabian Hueske-2
I think that's actually very use case specific.

You're code will never see the malformed record because it is dropped by the input format.
Other applications might rely on complete input and would prefer an exception to be notified about invalid input.
Flink's CsvInputFormat has a parameter "lenient" which makes this property configurable.

I agree with Stephan that we should add a record-size parameter to the DelimitedOutputFormat (which is the basis for the CsvInputFormat).

Cheers, Fabain

2015-10-08 19:33 GMT+02:00 KOSTIANTYN Kudriavtsev <[hidden email]>:

Yes it's,
I'm checking number of columns per line to filter out mailformed

*Sent from my ZenFone

On Oct 8, 2015 1:19 PM, "Stephan Ewen" <[hidden email]> wrote:
There is probably a different CSV input format implementation which drops invalid lines (too long lines).

Is that actually desired behavior, simply dropping malformatted input?

On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hm, you was write

I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. 

Thanks for help!

Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <[hidden email]> wrote:
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev