Flink job didn't restart when a task failed

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job didn't restart when a task failed

Hanson, Bruce

Hello Flink folks:

 

We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:

 

2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.

 

But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.

 

Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.

 

It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?

 

We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.

 

This is the full exception that started the problem:

 

2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')

       at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

       at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

       at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)

       at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)

       at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)

       at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

       at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)

       at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: Connection timed out

       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

       at sun.nio.ch.IOUtil.read(IOUtil.java:192)

       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

       at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)

       at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)

       at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)

       at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)

       ... 6 common frames omitted

 

 

 

 

 

cid:image001.png@01D2B473.0F7F85E0

 

Bruce Hanson

Principal Engineer

M: +1 425 681 0422

 

HERE Technologies

701 Pike Street, Suite 2000

Seattle, WA 98101 USA

47° 36' 41" N 122° 19' 57" W

 

cid:image002.png@01D2B473.0F7F85E0    cid:image003.png@01D2B473.0F7F85E0   cid:image004.png@01D2B473.0F7F85E0    cid:image005.png@01D2B473.0F7F85E0    cid:image006.png@01D2B473.0F7F85E0

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Aljoscha Krettek
Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:

> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Till Rohrmann
Hi Bruce,

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

Cheers,
Till

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Till Rohrmann
For future reference, here is the issue to track the reconciliation logic [1].


Cheers,
Till

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Bruce,

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

Cheers,
Till

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Zhu Zhu
Sorry for not following this ML earlier.

I think the cause might be that the final state ('FAILED') update message to JM is lost. TaskExecutor will simply fail the task (which does not take effect in this case since the task is already FAILED) and will not update the task state again in this case.
@Bruce would you take a look at the TM log? If the guess is right, in task manager logs there will be one line "Task {} is already in state FAILED." 

Thanks,
Zhu Zhu

Till Rohrmann <[hidden email]> 于2020年4月10日周五 上午12:51写道:
For future reference, here is the issue to track the reconciliation logic [1].


Cheers,
Till

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Bruce,

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

Cheers,
Till

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Hanson, Bruce

Hi Zhu Zhu (and Till),

 

Thanks for your thoughts on this problem. I do not see a message like the one you mention "Task {} is already in state FAILED." I have attached a file with all the task manager logs that we received at the time this happened. As you see, there aren’t many. We turned on debug logging for “org.apache.flink” on this job this afternoon so maybe we’ll find something interesting if/when the issue happens again. I do hope we can catch it in the act.

 

-Bruce

 

-- 

 

 

From: Zhu Zhu <[hidden email]>
Date: Monday, April 13, 2020 at 9:29 PM
To: Till Rohrmann <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>, user <[hidden email]>, Gary Yao <[hidden email]>
Subject: Re: Flink job didn't restart when a task failed

 

Sorry for not following this ML earlier.

 

I think the cause might be that the final state ('FAILED') update message to JM is lost. TaskExecutor will simply fail the task (which does not take effect in this case since the task is already FAILED) and will not update the task state again in this case.

@Bruce would you take a look at the TM log? If the guess is right, in task manager logs there will be one line "Task {} is already in state FAILED." 

 

Thanks,

Zhu Zhu

 

Till Rohrmann <[hidden email]> 2020410日周五 上午12:51写道:

For future reference, here is the issue to track the reconciliation logic [1].

 

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann <[hidden email]> wrote:

Hi Bruce,

 

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

 

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

 

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:

Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>


hhw1.log (14K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Zhu Zhu
Sorry I made a mistake. Even if it's the case I had guessed, you will not get a log "Task {} is already in state FAILED." because that task was already unregistered before trying to update the state to JM. Unfortunately currently we have no log which can be used to prove it.
Just to confirm that the line "FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED" does not appear in the JM log, right? This might be an issue that the message was lost on network, which should be a rare case. Do you encounter it often?

Thanks,
Zhu Zhu

Hanson, Bruce <[hidden email]> 于2020年4月15日周三 上午9:16写道:

Hi Zhu Zhu (and Till),

 

Thanks for your thoughts on this problem. I do not see a message like the one you mention "Task {} is already in state FAILED." I have attached a file with all the task manager logs that we received at the time this happened. As you see, there aren’t many. We turned on debug logging for “org.apache.flink” on this job this afternoon so maybe we’ll find something interesting if/when the issue happens again. I do hope we can catch it in the act.

 

-Bruce

 

-- 

 

 

From: Zhu Zhu <[hidden email]>
Date: Monday, April 13, 2020 at 9:29 PM
To: Till Rohrmann <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>, user <[hidden email]>, Gary Yao <[hidden email]>
Subject: Re: Flink job didn't restart when a task failed

 

Sorry for not following this ML earlier.

 

I think the cause might be that the final state ('FAILED') update message to JM is lost. TaskExecutor will simply fail the task (which does not take effect in this case since the task is already FAILED) and will not update the task state again in this case.

@Bruce would you take a look at the TM log? If the guess is right, in task manager logs there will be one line "Task {} is already in state FAILED." 

 

Thanks,

Zhu Zhu

 

Till Rohrmann <[hidden email]> 2020410日周五 上午12:51写道:

For future reference, here is the issue to track the reconciliation logic [1].

 

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann <[hidden email]> wrote:

Hi Bruce,

 

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

 

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

 

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:

Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink job didn't restart when a task failed

Till Rohrmann
Keep us posted once you caught the problem in the act. This would help to debug/understand this problem tremendously.

Cheers,
Till

On Wed, Apr 15, 2020 at 8:44 AM Zhu Zhu <[hidden email]> wrote:
Sorry I made a mistake. Even if it's the case I had guessed, you will not get a log "Task {} is already in state FAILED." because that task was already unregistered before trying to update the state to JM. Unfortunately currently we have no log which can be used to prove it.
Just to confirm that the line "FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED" does not appear in the JM log, right? This might be an issue that the message was lost on network, which should be a rare case. Do you encounter it often?

Thanks,
Zhu Zhu

Hanson, Bruce <[hidden email]> 于2020年4月15日周三 上午9:16写道:

Hi Zhu Zhu (and Till),

 

Thanks for your thoughts on this problem. I do not see a message like the one you mention "Task {} is already in state FAILED." I have attached a file with all the task manager logs that we received at the time this happened. As you see, there aren’t many. We turned on debug logging for “org.apache.flink” on this job this afternoon so maybe we’ll find something interesting if/when the issue happens again. I do hope we can catch it in the act.

 

-Bruce

 

-- 

 

 

From: Zhu Zhu <[hidden email]>
Date: Monday, April 13, 2020 at 9:29 PM
To: Till Rohrmann <[hidden email]>
Cc: Aljoscha Krettek <[hidden email]>, user <[hidden email]>, Gary Yao <[hidden email]>
Subject: Re: Flink job didn't restart when a task failed

 

Sorry for not following this ML earlier.

 

I think the cause might be that the final state ('FAILED') update message to JM is lost. TaskExecutor will simply fail the task (which does not take effect in this case since the task is already FAILED) and will not update the task state again in this case.

@Bruce would you take a look at the TM log? If the guess is right, in task manager logs there will be one line "Task {} is already in state FAILED." 

 

Thanks,

Zhu Zhu

 

Till Rohrmann <[hidden email]> 2020410日周五 上午12:51写道:

For future reference, here is the issue to track the reconciliation logic [1].

 

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann <[hidden email]> wrote:

Hi Bruce,

 

what you are describing sounds indeed quite bad. Quite hard to say whether we fixed such an issue in 1.10. It is definitely worth a try to upgrade, though.

 

In order to further debug the problem, it would be really great if you could provide us with the log files of the JobMaster and the TaskExecutor. Ideally on debug log level if you have them.

 

One thing which we wanted to add is sending the current task statuses as part of the heartbeat from the TM to the JM. Having this information would allow us to reconcile a situation like you are describing.

 

Cheers,

Till

 

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek <[hidden email]> wrote:

Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a
bit on the scheduler?

Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:
> Hello Flink folks:
>
> We had a problem with a Flink job the other day that I haven’t seen before. One task encountered a failure and switched to FAILED (see the full exception below). After the failure, the task said it was notifying the Job Manager:
>
> 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>
> But I see no evidence that the Job Manager got the message. I would expect with this type of failure that the Job Manager would restart the job. In this case, the job carried on, hobbled, until the it stopped processing data and our user had to manually restart the job. The job also started experiencing checkpoint timeouts on every checkpoint due to this operator stopping.
>
> Had the job restarted when this happened, I believe everything would have been ok as the job had an appropriate restart strategy in place. The Task Manager that this task was running on remained healthy and was actively processing other tasks.
>
> It seems like this is some kind of a bug. Is this something anyone has seen before? Could it be something that has been fixed if we went to Flink 1.10?
>
> We are running Flink 1.7.2. I know it’s rather old now. We run a managed environment where users can run their jobs, and are in the process of upgrading to 1.10.
>
> This is the full exception that started the problem:
>
> 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: Connection timed out (connection to '/100.112.98.121:36256')
>         at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>         at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>         at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection timed out
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>         at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
>         at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
>         at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>         ... 6 common frames omitted
>
>
>
>
>
> [cid:image001.png@01D2B473.0F7F85E0]
>
> Bruce Hanson
> Principal Engineer
> M: +1 425 681 0422
>
> HERE Technologies
> 701 Pike Street, Suite 2000
> Seattle, WA 98101 USA
> 47° 36' 41" N 122° 19' 57" W
>
> [cid:image002.png@01D2B473.0F7F85E0]<http://360.here.com/>    [cid:image003.png@01D2B473.0F7F85E0] <https://www.twitter.com/here>    [cid:image004.png@01D2B473.0F7F85E0] <https://www.facebook.com/here>     [cid:image005.png@01D2B473.0F7F85E0] <https://www.linkedin.com/company/heremaps>     [cid:image006.png@01D2B473.0F7F85E0] <https://www.instagram.com/here/>
>
>