Left join with unbalanced dataset

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

RE: Left join with unbalanced dataset

LINZ, Arnaud

Hi,

Yes, I’m always a bit reluctant before installing a snapshot version « for everyone », and I was hoping it would suffice…

However, I’ve just recompiled everything and ran with a real 0.10.1 snapshot and everything worked at an astounding speed with a reasonable memory amount.

Thanks for the great work and the help, as always,

Arnaud

 

De : Fabian Hueske [mailto:[hidden email]]
Envoyé : mercredi 3 février 2016 10:51
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi Arnauld,

in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation"

This will not fix the Netty version error. You need to install a new Flink version or submit the Flink job, with a new Flink version to YARN to make sure that the correct Netty version is used.

Best, Fabian

 

2016-02-03 10:44 GMT+01:00 Stephan Ewen <[hidden email]>:

Hi!

 

I think the closed channel is actually an effect of the process kill. Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which means that UNIX is killing the process.

I assume that the first thing that happens is that UNIX closes the open file handles, while the JVM shutdown hooks are still in progress. Hence the exception.

 

So, the root cause is still the YARN memory killer.

 

The log comes from release version 0.10.0.

The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

 

Greetings,

Stephan

 

 

On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

 

In the mean time, log extract :

 

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454

================================================================================================

 

15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor

15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]

15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds

 

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).

15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).

15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.

15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)

15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).

15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]

15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)

15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.

 

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)

15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED

15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)

15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)

15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)

15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)

15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)

com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)

        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)

        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)

        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)

        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)

        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)

        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)

        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)

        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)

        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)

        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)

        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)

        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)

        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)

        ... 25 more

 

     

(...)

______________________

15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :

        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch

        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

   

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

To make sure this discussion does not go in a wrong direction:

 

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

 

The issue here is different

   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.

   - Another reason could be leaky behavior in Hadoop's HDFS code.

 

 

@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

 

Best,

Stephan

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Left join with unbalanced dataset

Fabian Hueske-2
Glad to hear that!
We will release Flink 0.10.2( based on the release-0.10 branch) soon.

Best, Fabian

2016-02-03 14:49 GMT+01:00 LINZ, Arnaud <[hidden email]>:

Hi,

Yes, I’m always a bit reluctant before installing a snapshot version « for everyone », and I was hoping it would suffice…

However, I’ve just recompiled everything and ran with a real 0.10.1 snapshot and everything worked at an astounding speed with a reasonable memory amount.

Thanks for the great work and the help, as always,

Arnaud

 

De : Fabian Hueske [mailto:[hidden email]]
Envoyé : mercredi 3 février 2016 10:51


À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

Hi Arnauld,

in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation"

This will not fix the Netty version error. You need to install a new Flink version or submit the Flink job, with a new Flink version to YARN to make sure that the correct Netty version is used.

Best, Fabian

 

2016-02-03 10:44 GMT+01:00 Stephan Ewen <[hidden email]>:

Hi!

 

I think the closed channel is actually an effect of the process kill. Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which means that UNIX is killing the process.

I assume that the first thing that happens is that UNIX closes the open file handles, while the JVM shutdown hooks are still in progress. Hence the exception.

 

So, the root cause is still the YARN memory killer.

 

The log comes from release version 0.10.0.

The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

 

Greetings,

Stephan

 

 

On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi,

 

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

 

In the mean time, log extract :

 

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454

================================================================================================

 

15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes

15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch

15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------

15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor

15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]

15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds

 

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).

15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).

15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.

15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)

15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).

15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]

15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)

15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.

 

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)

15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED

15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)

15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)

15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)

Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)

        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)

        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

        ... 8 more

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)

15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)

15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)

15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)

15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)

com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)

        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)

        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)

        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)

        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)

        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)

        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)

        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)

        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)

        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)

        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)

        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)

        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)

        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)

        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)

        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)

        at java.lang.Thread.run(Thread.java:744)

Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: [hidden email]

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)

        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)

        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)

        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)

        ... 25 more

 

     

(...)

______________________

15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :

        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch

        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

   

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : [hidden email]
Objet : Re: Left join with unbalanced dataset

 

To make sure this discussion does not go in a wrong direction:

 

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

 

The issue here is different

   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.

   - Another reason could be leaky behavior in Hadoop's HDFS code.

 

 

@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

 

Best,

Stephan

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 


12