Flink 1.7.2 extremely unstable and losing jobs in prod

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.7.2 extremely unstable and losing jobs in prod

Bruno Aranda
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Andrey Zagrebin-3
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Bruno Aranda
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno

dashboard.png (52K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Till Rohrmann
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Bruno Aranda
Ok, here it goes:


In an attempt to make it smaller, did remove the noisy "http wire" ones and masked a couple of things. Not sure this covers everything you would like to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Till Rohrmann
Hi Bruno,

sorry for getting back to you so late. I just tried to access your logs to investigate the problem but transfer.sh tells me that they are no longer there. Could you maybe re-upload them or directly send them to my mail address. Sorry for not taking faster a look at your problem and the inconveniences with the upload.

Cheers,
Till

On Thu, Mar 21, 2019 at 4:30 PM Bruno Aranda <[hidden email]> wrote:
Ok, here it goes:


In an attempt to make it smaller, did remove the noisy "http wire" ones and masked a couple of things. Not sure this covers everything you would like to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Bruno Aranda
Hi Till,

Many thanks for your reply and don't worry. We understand this is tricky and you are busy. 

We have been experiencing some issues, and a couple of them have been addressed, so the logs probably were not relevant anymore.

About losing jobs on restart -> it seems that YARN was killing the container for the master due to it not passing the liveness probe. Since Flink 1.1 or something we had been using very stringent liveness probe timeouts in Yarn to detect very fast when a node in the cluster was going out of service. This timeout (30 seconds) was probably killing the job manager before it was able to recover the ~40 streaming jobs that we run in session mode. I wonder why we had not seen that in 1.6, though, probably because of the legacy mode?

Extremely high unstability -> that was caused because we were running in DEBUG mode to capture logs and the sheer number of them (especially coming from AsyncFunctions) did cause the disks to fill and YARN to decomission the nodes. We do process many thousands of messages per second in some of our jobs.

We still have a few instances of Job Managers losing leadership every few hours (all of them in the cluster). Another of our jobs restarts more often, but the "Exceptions" tab in the UI for the job just tells us that "The assigned slot XXX was removed". It would be helpful to see why it was removed, though. 

I am currently looking at those. But the logs don't tell me much (and I cannot run them in this environment with such a low level anymore). There is only one thing at ERROR level for when the more unstable job restarts:

java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours),
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:376),
at akka.actor.Actor.aroundReceive(Actor.scala:502),
at akka.actor.Actor.aroundReceive$(Actor.scala:500),
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203),
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526),
at akka.actor.ActorCell.invoke(ActorCell.scala:495),
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257),
at akka.dispatch.Mailbox.run(Mailbox.scala:224),
at akka.dispatch.Mailbox.exec(Mailbox.scala:234),
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

And for the other, all the job managers losing leadership, just some warnings about the association with the remote system failing, ie: 

Remote connection to [null] failed with java.net.ConnectException: Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041
Association with remote system [akka.tcp://[hidden email]:43041] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://[hidden email]:43041]] Caused by: [Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041]

over and over again...

Thanks for any insight.

Bruno

On Mon, 8 Apr 2019 at 10:45, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

sorry for getting back to you so late. I just tried to access your logs to investigate the problem but transfer.sh tells me that they are no longer there. Could you maybe re-upload them or directly send them to my mail address. Sorry for not taking faster a look at your problem and the inconveniences with the upload.

Cheers,
Till

On Thu, Mar 21, 2019 at 4:30 PM Bruno Aranda <[hidden email]> wrote:
Ok, here it goes:


In an attempt to make it smaller, did remove the noisy "http wire" ones and masked a couple of things. Not sure this covers everything you would like to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Till Rohrmann
Hi Bruno,

first of all good to hear that you could resolve some of the problems.

Slots get removed if a TaskManager gets unregistered from the SlotPool. This usually happens if a TaskManager closes its connection or its heartbeat with the ResourceManager times out. So you could look for messages like "The heartbeat of TaskManager with id ... timed out".

If JobManagers lose its leadership, it could also have something to do with the ZooKeeper cluster and the configuration of Flink's ZooKeeper client [1] or your network in general.

For the warnings you see, it's hard to say without the full picture. Could `10.10.56.193:43041` be a TaskManager which just died and, hence, cannot be connected to anymore?


Cheers,
Till

On Mon, Apr 8, 2019 at 12:33 PM Bruno Aranda <[hidden email]> wrote:
Hi Till,

Many thanks for your reply and don't worry. We understand this is tricky and you are busy. 

We have been experiencing some issues, and a couple of them have been addressed, so the logs probably were not relevant anymore.

About losing jobs on restart -> it seems that YARN was killing the container for the master due to it not passing the liveness probe. Since Flink 1.1 or something we had been using very stringent liveness probe timeouts in Yarn to detect very fast when a node in the cluster was going out of service. This timeout (30 seconds) was probably killing the job manager before it was able to recover the ~40 streaming jobs that we run in session mode. I wonder why we had not seen that in 1.6, though, probably because of the legacy mode?

Extremely high unstability -> that was caused because we were running in DEBUG mode to capture logs and the sheer number of them (especially coming from AsyncFunctions) did cause the disks to fill and YARN to decomission the nodes. We do process many thousands of messages per second in some of our jobs.

We still have a few instances of Job Managers losing leadership every few hours (all of them in the cluster). Another of our jobs restarts more often, but the "Exceptions" tab in the UI for the job just tells us that "The assigned slot XXX was removed". It would be helpful to see why it was removed, though. 

I am currently looking at those. But the logs don't tell me much (and I cannot run them in this environment with such a low level anymore). There is only one thing at ERROR level for when the more unstable job restarts:

java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours),
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:376),
at akka.actor.Actor.aroundReceive(Actor.scala:502),
at akka.actor.Actor.aroundReceive$(Actor.scala:500),
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203),
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526),
at akka.actor.ActorCell.invoke(ActorCell.scala:495),
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257),
at akka.dispatch.Mailbox.run(Mailbox.scala:224),
at akka.dispatch.Mailbox.exec(Mailbox.scala:234),
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

And for the other, all the job managers losing leadership, just some warnings about the association with the remote system failing, ie: 

Remote connection to [null] failed with java.net.ConnectException: Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041
Association with remote system [akka.tcp://[hidden email]:43041] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://[hidden email]:43041]] Caused by: [Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041]

over and over again...

Thanks for any insight.

Bruno

On Mon, 8 Apr 2019 at 10:45, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

sorry for getting back to you so late. I just tried to access your logs to investigate the problem but transfer.sh tells me that they are no longer there. Could you maybe re-upload them or directly send them to my mail address. Sorry for not taking faster a look at your problem and the inconveniences with the upload.

Cheers,
Till

On Thu, Mar 21, 2019 at 4:30 PM Bruno Aranda <[hidden email]> wrote:
Ok, here it goes:


In an attempt to make it smaller, did remove the noisy "http wire" ones and masked a couple of things. Not sure this covers everything you would like to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.7.2 extremely unstable and losing jobs in prod

Bruno Aranda
Thanks Till, I will start separate threads for the two issues we are experiencing.

Cheers,

Bruno

On Mon, 8 Apr 2019 at 15:27, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

first of all good to hear that you could resolve some of the problems.

Slots get removed if a TaskManager gets unregistered from the SlotPool. This usually happens if a TaskManager closes its connection or its heartbeat with the ResourceManager times out. So you could look for messages like "The heartbeat of TaskManager with id ... timed out".

If JobManagers lose its leadership, it could also have something to do with the ZooKeeper cluster and the configuration of Flink's ZooKeeper client [1] or your network in general.

For the warnings you see, it's hard to say without the full picture. Could `10.10.56.193:43041` be a TaskManager which just died and, hence, cannot be connected to anymore?


Cheers,
Till

On Mon, Apr 8, 2019 at 12:33 PM Bruno Aranda <[hidden email]> wrote:
Hi Till,

Many thanks for your reply and don't worry. We understand this is tricky and you are busy. 

We have been experiencing some issues, and a couple of them have been addressed, so the logs probably were not relevant anymore.

About losing jobs on restart -> it seems that YARN was killing the container for the master due to it not passing the liveness probe. Since Flink 1.1 or something we had been using very stringent liveness probe timeouts in Yarn to detect very fast when a node in the cluster was going out of service. This timeout (30 seconds) was probably killing the job manager before it was able to recover the ~40 streaming jobs that we run in session mode. I wonder why we had not seen that in 1.6, though, probably because of the legacy mode?

Extremely high unstability -> that was caused because we were running in DEBUG mode to capture logs and the sheer number of them (especially coming from AsyncFunctions) did cause the disks to fill and YARN to decomission the nodes. We do process many thousands of messages per second in some of our jobs.

We still have a few instances of Job Managers losing leadership every few hours (all of them in the cluster). Another of our jobs restarts more often, but the "Exceptions" tab in the UI for the job just tells us that "The assigned slot XXX was removed". It would be helpful to see why it was removed, though. 

I am currently looking at those. But the logs don't tell me much (and I cannot run them in this environment with such a low level anymore). There is only one thing at ERROR level for when the more unstable job restarts:

java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours),
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:376),
at akka.actor.Actor.aroundReceive(Actor.scala:502),
at akka.actor.Actor.aroundReceive$(Actor.scala:500),
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203),
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526),
at akka.actor.ActorCell.invoke(ActorCell.scala:495),
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257),
at akka.dispatch.Mailbox.run(Mailbox.scala:224),
at akka.dispatch.Mailbox.exec(Mailbox.scala:234),
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289),
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056),
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692),
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

And for the other, all the job managers losing leadership, just some warnings about the association with the remote system failing, ie: 

Remote connection to [null] failed with java.net.ConnectException: Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041
Association with remote system [akka.tcp://[hidden email]:43041] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://[hidden email]:43041]] Caused by: [Connection refused: ip-10-10-56-193.eu-west-1.compute.internal/10.10.56.193:43041]

over and over again...

Thanks for any insight.

Bruno

On Mon, 8 Apr 2019 at 10:45, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

sorry for getting back to you so late. I just tried to access your logs to investigate the problem but transfer.sh tells me that they are no longer there. Could you maybe re-upload them or directly send them to my mail address. Sorry for not taking faster a look at your problem and the inconveniences with the upload.

Cheers,
Till

On Thu, Mar 21, 2019 at 4:30 PM Bruno Aranda <[hidden email]> wrote:
Ok, here it goes:


In an attempt to make it smaller, did remove the noisy "http wire" ones and masked a couple of things. Not sure this covers everything you would like to see.

Thanks!

Bruno

On Thu, 21 Mar 2019 at 15:24, Till Rohrmann <[hidden email]> wrote:
Hi Bruno,

could you upload the logs to https://transfer.sh/ or https://gist.github.com/ and then post a link. For further debugging this will be crucial. It would be really good if you could set the log level to DEBUG.

Concerning the number of registered TMs, the new mode (not the legacy mode), no longer respects the `-n` setting when you start a yarn session. Instead it will dynamically start as many containers as you need to run the submitted jobs. That's why you don't see the spare TM and this is the expected behaviour.

The community intends to add support for ranges of how many TMs must be active at any given time [1].


Cheers,
Till

On Thu, Mar 21, 2019 at 1:50 PM Bruno Aranda <[hidden email]> wrote:
Hi Andrey,

Thanks for your response. I was trying to get the logs somewhere but they are biggish (~4Mb). Do you suggest somewhere I could put them?

In any case, I can see exceptions like this:

2019/03/18 10:11:50,763 DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing slot [SlotRequestId{ab89ff271ebf317a13a9e773aca4e9fb}] because: null
2019/03/18 10:11:50,807 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job alert-event-beeTrap-notifier (2ff941926e6ad80ba441d9cfcd7d689d) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
...

It looks like a TM may crash, and then the JM. And then the JM is not able to find slots for the tasks in a reasonable time frame? Weirdly, we are running 13 TMs with 6 slots each (we used legacy mode in 1.6), and we always try to keep an extra TM worth of free slots just in case. Looking at the dashboard, I see 12 TMs, 2 free slots, but we tell Flink 13 are available when we start the session in yarn.

Any ideas? It is way less stable for us these days without having changed settings much since we started using Flink around 1.2 some time back.

Thanks,

Bruno



On Tue, 19 Mar 2019 at 17:09, Andrey Zagrebin <[hidden email]> wrote:
Hi Bruno,

could you also share the job master logs?

Thanks,
Andrey

On Tue, Mar 19, 2019 at 12:03 PM Bruno Aranda <[hidden email]> wrote:
Hi,

This is causing serious instability and data loss in our production environment. Any help figuring out what's going on here would be really appreciated.

We recently updated our two EMR clusters from flink 1.6.1 to flink 1.7.2 (running on AWS EMR). The road to the upgrade was fairly rocky, but we felt like it was working sufficiently well in our pre-production environments that we rolled it out to prod.

However we're now seeing the jobmanager crash spontaneously several times a day. There doesn't seem to be any pattern to when this happens - it doesn't coincide with an increase in the data flowing through the system, nor is it at the same time of day.

The big problem is that when it recovers, sometimes a lot of the jobs fail to resume with the following exception:

org.apache.flink.util.FlinkException: JobManager responsible for 2401cd85e70698b25ae4fb2955f96fd0 lost the leadership.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1185)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:138)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1625)
//...
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id abb0e96af8966f93d839e4d9395c7697 timed out.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1626)
    ... 16 more

Starting them manually afterwards doesn't resume from checkpoint, which for most jobs means it starts from the end of the source kafka topic. This means whenever this surprise jobmanager restart happens, we have a ticking clock during which we're losing data.

We speculate that those jobs die first and while they wait to be restarted (they have a 30 second delay strategy), the job manager restarts and does not recover them? In any case, we have never seen so many job failures and JM restarts with exactly the same EMR config.

We've got some functionality we're building that uses the StreamingFileSink over S3 bugfixes in 1.7.2, so rolling back isn't an ideal option.

Looking through the mailing list, we found https://issues.apache.org/jira/browse/FLINK-11843 - does it seem possible this might be related?

Best regards,

Bruno