Left join with unbalanced dataset

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Left join with unbalanced dataset

LINZ, Arnaud
Hello,

I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.

I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.

I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM

Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?

Best regards,

Arnaud




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Chiwan Park-2
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Till Rohrmann
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Stephan Ewen
Hi!

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <[hidden email]> wrote:
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud

Hi,

Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.

I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.

Arnaud

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : dimanche 31 janv
ier 2016 20:57
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi!

 

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

 

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

 

Thanks,

Stephan

 

 

On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Arnaud,

 

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

 

Cheers,

Till

 

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:

Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>

> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 

Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud
In reply to this post by Stephan Ewen

Hi,

 

Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2.

Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

 

Do Flink uses off-heap memory ? How can I monitor it ?

 

Thanks,

Arnaud

 

10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

          at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

          ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

 

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :
          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch 
          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch 
 
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
 
10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager          

 

 

 

De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À : [hidden email]
Objet : RE: Left join with unbalanced dataset

 

Hi,

Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.

I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.

Arnaud

 

 

De : [hidden email] [[hidden email]] De la part de Stephan Ewen
Envoyé : dimanche 31 janv
ier 2016 20:57
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi!

 

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

 

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

 

Thanks,

Stephan

 

 

On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Arnaud,

 

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

 

Cheers,

Till

 

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:

Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>

> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Stephan Ewen
Hi Arnaud!

Which version of Flink are you using? In 0.10.1, the Netty library version that we use has changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT.

If that turns out to be the cause, the good news is that we started discussing a 0.10.2 maintenance release that should also have a fix for that.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2.

Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

 

Do Flink uses off-heap memory ? How can I monitor it ?

 

Thanks,

Arnaud

 

10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

          at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

          ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

 

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :
          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch 
          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch 
 
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
 
10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager          

 

 

 

De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À : [hidden email]
Objet : RE: Left join with unbalanced dataset

 

Hi,

Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.

I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.

Arnaud

 

 

De : [hidden email] [[hidden email]] De la part de Stephan Ewen
Envoyé : dimanche 31 janv
ier 2016 20:57
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi!

 

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

 

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

 

Thanks,

Stephan

 

 

On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Arnaud,

 

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

 

Cheers,

Till

 

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:

Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>

> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud

Thanks,

I’m using the official 0.10 release. I will try to use the 0.10 snapshot.

 

FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

 

12:20:17,313 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job c55216ab9383fd14e1d287a69a6e0f7e (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:153)) -> Map (Key Extractor 1)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory

         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

         at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory

         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

         at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

         at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

         ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory

         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory

         at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)

         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)

         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)

         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)

         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)

         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)

         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)

         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)

         at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)

         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)

         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)

         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)

         at java.lang.Thread.run(Thread.java:744)

Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory

         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)

         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

         ... 9 more

Caused by: java.lang.OutOfMemoryError: Direct buffer memory

         at java.nio.Bits.reserveMemory(Bits.java:658)

         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)

         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)

         at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)

         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)

         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)

         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)

         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)

         at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)

         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)

         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)

         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)

         at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)

         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 11:30
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi Arnaud!

 

Which version of Flink are you using? In 0.10.1, the Netty library version that we use has changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT.

 

If that turns out to be the cause, the good news is that we started discussing a 0.10.2 maintenance release that should also have a fix for that.

 

Greetings,

Stephan

 

 

On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2.

Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

 

Do Flink uses off-heap memory ? How can I monitor it ?

 

Thanks,

Arnaud

 

10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

          at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

          ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

 

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :
          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch 
          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch 
 
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
 
10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager          

 

 

 

De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À :
[hidden email]
Objet : RE: Left join with unbalanced dataset

 

Hi,

Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.

I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.

Arnaud

 

 

De : [hidden email] [[hidden email]] De la part de Stephan Ewen
Envoyé : dimanche 31 janv
ier 2016 20:57
À :
[hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi!

 

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

 

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

 

Thanks,

Stephan

 

 

On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Arnaud,

 

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

 

Cheers,

Till

 

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <[hidden email]> wrote:

Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <[hidden email]> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>

> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Ufuk Celebi

> On 02 Feb 2016, at 13:28, LINZ, Arnaud <[hidden email]> wrote:
>
> Thanks,
> I’m using the official 0.10 release. I will try to use the 0.10 snapshot.
>  
> FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

That’s the error Stephan was referring to. Does the snapshot version fix it for you?

I will prepare a 0.10.2 bug fix release, which includes the fix.

– Ufuk

Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud
Hi,

Unfortunalety, it still fails, but with a different error (see below).
Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'bt1shli2/172.21.125.27:49771'.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
        at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:270)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)


-----Message d'origine-----
De : Ufuk Celebi [mailto:[hidden email]]
Envoyé : mardi 2 février 2016 13:52
À : [hidden email]
Objet : Re: Left join with unbalanced dataset


> On 02 Feb 2016, at 13:28, LINZ, Arnaud <[hidden email]> wrote:
>
> Thanks,
> I’m using the official 0.10 release. I will try to use the 0.10 snapshot.
>
> FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

That’s the error Stephan was referring to. Does the snapshot version fix it for you?

I will prepare a 0.10.2 bug fix release, which includes the fix.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Ufuk Celebi

> On 02 Feb 2016, at 14:31, LINZ, Arnaud <[hidden email]> wrote:
>
> Hi,
>
> Unfortunalety, it still fails, but with a different error (see below).
> Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation.

This means that the task at task manager bt1shli2/172.21.125.27:49771 failed during the production of the intermediate data. It’s independent of the memory problem.

Could you please check the logs of that task manager? Sorry for the inconvenience! I hope that we can resolve this shortly.

– Ufuk

Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud
Hi,

Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

How do I retrieve the log of a specific task manager post-mortem? I don't use a permanent Flink/Yarn container (it's killed upon batch completion).


-----Message d'origine-----
De : Ufuk Celebi [mailto:[hidden email]]
Envoyé : mardi 2 février 2016 14:41
This means that the task at task manager bt1shli2/172.21.125.27:49771 failed during the production of the intermediate data. It’s independent of the memory problem.

Could you please check the logs of that task manager? Sorry for the inconvenience! I hope that we can resolve this shortly.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

rmetzger0
Hi Arnaud,

you can retrieve the logs of a yarn application by calling "yarn logs -applicationId <id>".

Its going to output you the logs of all Taskmanagers and the job manager in one stream. I would pipe the output into a file and then search for the position where the log for the failing taskmanager starts.

On Tue, Feb 2, 2016 at 3:15 PM, LINZ, Arnaud <[hidden email]> wrote:
Hi,

Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

How do I retrieve the log of a specific task manager post-mortem? I don't use a permanent Flink/Yarn container (it's killed upon batch completion).


-----Message d'origine-----
De : Ufuk Celebi [mailto:[hidden email]]
Envoyé : mardi 2 février 2016 14:41
This means that the task at task manager bt1shli2/172.21.125.27:49771 failed during the production of the intermediate data. It’s independent of the memory problem.

Could you please check the logs of that task manager? Sorry for the inconvenience! I hope that we can resolve this shortly.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Ufuk Celebi
In reply to this post by LINZ, Arnaud

> On 02 Feb 2016, at 15:15, LINZ, Arnaud <[hidden email]> wrote:
>
> Hi,
>
> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

OK, the killing of the container probably triggered the RemoteTransportException.

Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?

You can monitor memory usage by setting

taskmanager.debug.memory.startLogThread: true

in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?

You can also have a look at it in the web frontend under the Task Manager tab.

– Ufuk

Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud
Thanks,

Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.

I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce.
I am willing to optimize my code by reducing the join output size upon junction.

The outline of the treatment is :
DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)

I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.

Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible.
I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"

Is there a way to do this ?

The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM.

Best regards,
Arnaud

-----Message d'origine-----
De : Ufuk Celebi [mailto:[hidden email]]
Envoyé : mardi 2 février 2016 15:27
À : [hidden email]
Objet : Re: Left join with unbalanced dataset


> On 02 Feb 2016, at 15:15, LINZ, Arnaud <[hidden email]> wrote:
>
> Hi,
>
> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

OK, the killing of the container probably triggered the RemoteTransportException.

Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?

You can monitor memory usage by setting

taskmanager.debug.memory.startLogThread: true

in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?

You can also have a look at it in the web frontend under the Task Manager tab.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Gábor Gévay
Hello Arnaud,

> Flink does not start the reduce operation until all lines have
> been created (memory bottleneck is during the collection
> of all lines) ; but theorically it is possible.

The problem that `S.groupBy(...).reduce(...)` needs to fully
materialize S comes from the fact that the implementation of reduce is
currently sort based. But this PR will partially solve this:
https://github.com/apache/flink/pull/1517
It implements a hash-based combiner, which will not materialize the
input, but instead needs memory proportional to only the number of
different keys occurring. You might want to try rebasing to this PR,
to see whether it improves your situation.

(I also plan to extend this implementation to the actual reduce after
the combine, but I'm not sure when will I get around to that.)

Best,
Gábor



2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <[hidden email]>:

> Thanks,
>
> Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.
>
> I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce.
> I am willing to optimize my code by reducing the join output size upon junction.
>
> The outline of the treatment is :
> DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
> DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)
>
> I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
> As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.
>
> Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible.
> I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"
>
> Is there a way to do this ?
>
> The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM.
>
> Best regards,
> Arnaud
>
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:[hidden email]]
> Envoyé : mardi 2 février 2016 15:27
> À : [hidden email]
> Objet : Re: Left join with unbalanced dataset
>
>
>> On 02 Feb 2016, at 15:15, LINZ, Arnaud <[hidden email]> wrote:
>>
>> Hi,
>>
>> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.
>
> OK, the killing of the container probably triggered the RemoteTransportException.
>
> Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?
>
> You can monitor memory usage by setting
>
> taskmanager.debug.memory.startLogThread: true
>
> in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?
>
> You can also have a look at it in the web frontend under the Task Manager tab.
>
> – Ufuk
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Stephan Ewen
To make sure this discussion does not go in a wrong direction:

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

The issue here is different
   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.
   - Another reason could be leaky behavior in Hadoop's HDFS code.


@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

Best,
Stephan


On Tue, Feb 2, 2016 at 6:21 PM, Gábor Gévay <[hidden email]> wrote:
Hello Arnaud,

> Flink does not start the reduce operation until all lines have
> been created (memory bottleneck is during the collection
> of all lines) ; but theorically it is possible.

The problem that `S.groupBy(...).reduce(...)` needs to fully
materialize S comes from the fact that the implementation of reduce is
currently sort based. But this PR will partially solve this:
https://github.com/apache/flink/pull/1517
It implements a hash-based combiner, which will not materialize the
input, but instead needs memory proportional to only the number of
different keys occurring. You might want to try rebasing to this PR,
to see whether it improves your situation.

(I also plan to extend this implementation to the actual reduce after
the combine, but I'm not sure when will I get around to that.)

Best,
Gábor



2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <[hidden email]>:
> Thanks,
>
> Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.
>
> I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce.
> I am willing to optimize my code by reducing the join output size upon junction.
>
> The outline of the treatment is :
> DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
> DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)
>
> I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
> As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.
>
> Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible.
> I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"
>
> Is there a way to do this ?
>
> The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM.
>
> Best regards,
> Arnaud
>
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:[hidden email]]
> Envoyé : mardi 2 février 2016 15:27
> À : [hidden email]
> Objet : Re: Left join with unbalanced dataset
>
>
>> On 02 Feb 2016, at 15:15, LINZ, Arnaud <[hidden email]> wrote:
>>
>> Hi,
>>
>> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.
>
> OK, the killing of the container probably triggered the RemoteTransportException.
>
> Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?
>
> You can monitor memory usage by setting
>
> taskmanager.debug.memory.startLogThread: true
>
> in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?
>
> You can also have a look at it in the web frontend under the Task Manager tab.
>
> – Ufuk
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud

Hi,

 

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

 

In the mean time, log extract :

 

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454

================================================================================================

 

15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor

15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]

15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds

 

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).

15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).

15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.

15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)

15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).

15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]

15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)

15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.

 

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)

15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED

15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)

15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)

15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)

15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)

15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)

com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)

        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)

        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)

        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)

        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)

        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)

        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)

        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)

        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)

        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)

        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)

        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)

        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)

        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)

        ... 25 more

 

     

(...)

______________________

15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :

        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch

        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

   

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

To make sure this discussion does not go in a wrong direction:

 

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

 

The issue here is different

   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.

   - Another reason could be leaky behavior in Hadoop's HDFS code.

 

 

@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

 

Best,

Stephan

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Stephan Ewen
Hi!

I think the closed channel is actually an effect of the process kill. Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open file handles, while the JVM shutdown hooks are still in progress. Hence the exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

 

In the mean time, log extract :

 

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454

================================================================================================

 

15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor

15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]

15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds

 

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).

15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).

15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.

15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)

15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).

15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]

15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)

15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.

 

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)

15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED

15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)

15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)

15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)

15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)

15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)

com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)

        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)

        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)

        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)

        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)

        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)

        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)

        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)

        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)

        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)

        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)

        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)

        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)

        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)

        ... 25 more

 

     

(...)

______________________

15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :

        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch

        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

   

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

To make sure this discussion does not go in a wrong direction:

 

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

 

The issue here is different

   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.

   - Another reason could be leaky behavior in Hadoop's HDFS code.

 

 

@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

 

Best,

Stephan

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Fabian Hueske-2
Hi Arnauld,

in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation"

This will not fix the Netty version error. You need to install a new Flink version or submit the Flink job, with a new Flink version to YARN to make sure that the correct Netty version is used.

Best, Fabian

2016-02-03 10:44 GMT+01:00 Stephan Ewen <[hidden email]>:
Hi!

I think the closed channel is actually an effect of the process kill. Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open file handles, while the JVM shutdown hooks are still in progress. Hence the exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

 

In the mean time, log extract :

 

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454

================================================================================================

 

15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor

15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]

15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds

 

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).

15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).

15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.

15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)

15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).

15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]

15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)

15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.

 

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)

15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED

15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)

15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)

15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)

15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)

15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)

com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)

        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)

        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)

        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)

        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)

        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)

        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)

        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)

        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)

        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)

        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)

        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)

        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)

        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)

        ... 25 more

 

     

(...)

______________________

15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :

        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch

        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

   

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

To make sure this discussion does not go in a wrong direction:

 

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

 

The issue here is different

   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.

   - Another reason could be leaky behavior in Hadoop's HDFS code.

 

 

@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

 

Best,

Stephan

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


12