Hi guys, I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space I tried to find the reason: 1) to identify TaskManager with a command ps aux | grep TaskManager 2) then build Heap histo: $ jmap -histo:live 19648 | head -n23 num #instances #bytes class name ---------------------------------------------- 1: 131018 3763501304 [B 2: 61022 7820352 <methodKlass> 3: 61022 7688456 <constMethodKlass> 4: 4971 5454408 <constantPoolKlass> 5: 4966 4582232 <instanceKlassKlass> 6: 4169 3003104 <constantPoolCacheKlass> 7: 15696 1447168 [C 8: 1291 638824 [Ljava.lang.Object; 9: 5318 506000 java.lang.Class Do you have any ideas what can be the reason and how it can be fixed? Is Flink uses out-of-heap memory? Thank you, Konstantin Kudryavtsev |
Can you give us a bit more background? What exactly is your program doing? - Are you running a DataSet program, or a DataStream program? - Is it one simple source that reads from S3, or are there multiple sources? - What operations do you apply on the CSV file? - Are you using Flink's S3 connector, or the Hadoop S3 file system? Greetings, Stephan On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
|
In reply to this post by Kostiantyn Kudriavtsev
Hi Konstantin, Flink uses managed memory only for its internal processing (sorting, hash tables, etc.). If you allocate too much memory in your user code, it can still fail with an OOME. This can also happen for large broadcast sets. 2015-10-08 17:58 GMT+02:00 KOSTIANTYN Kudriavtsev <[hidden email]>:
|
In reply to this post by Stephan Ewen
It's DataSet program that performs simple filtering, crossjoin and aggregation. I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all. Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception *Sent from my ZenFone On Oct 8, 2015 12:05 PM, "Stephan Ewen" <[hidden email]> wrote:
|
Can you paste the exception stack trace? On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
|
10/08/2015 16:25:48 CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED java.lang.OutOfMemoryError: Java heap space at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) 10/08/2015 16:25:48 Job execution switched to status FAILING. 10/08/2015 16:25:48 DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED 10/08/2015 16:25:48 Job execution switched to status FAILED. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) at com.epam.AirSetJobExample.main(AirSetJobExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:315) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
|
What's confuse me, I'm running Flink on yarn with the following command: ./yarn-session.sh -n 4 -jm 2096 -tm 5000 so I expect to have TaskManager with almost 5GB ram available, but taskmanager manel I found that each task manager has the following conf: Flink Managed Memory: 2460 mb CPU cores: 4 Physical Memory 15046 mb and stats:
in UI on configuration panel I found:
Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
|
In reply to this post by Kostiantyn Kudriavtsev
Ah, that makes sense! The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception. Can you check whether the line split character and the encoding are properly defined? Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character) Greetings, Stephan On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
|
Hm, you was write I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. Thanks for help! Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <[hidden email]> wrote:
|
There is probably a different CSV input format implementation which drops invalid lines (too long lines). Is that actually desired behavior, simply dropping malformatted input? On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <[hidden email]> wrote:
|
Yes it's, *Sent from my ZenFone On Oct 8, 2015 1:19 PM, "Stephan Ewen" <[hidden email]> wrote:
|
I think that's actually very use case specific. You're code will never see the malformed record because it is dropped by the input format. Other applications might rely on complete input and would prefer an exception to be notified about invalid input. Flink's CsvInputFormat has a parameter "lenient" which makes this property configurable. I agree with Stephan that we should add a record-size parameter to the DelimitedOutputFormat (which is the basis for the CsvInputFormat). Cheers, Fabain 2015-10-08 19:33 GMT+02:00 KOSTIANTYN Kudriavtsev <[hidden email]>:
|
Free forum by Nabble | Edit this page |