Batch mode with Flink 1.8 unstable?

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch mode with Flink 1.8 unstable?

Ken Krugler
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[<a href="akka.tcp://flink@HOST:43911/temp/$n]:" class="">akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (<a href="akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0" class="">akka.tcp://flink@...:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (<a href="akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0" class="">akka.tcp://flink@...:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager <a href="akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(00000000000000000000000000000000)" class="">akka.tcp://flink@...:35979/user/resourcemanager(0000000000000000000000000<a href="akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(00000000000000000000000000000000)" class="">0000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Stephan Ewen
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

qi luo
Hi Stephan,

We have met similar issues described as Ken. Would all these issues be hopefully fixed in 1.9?

Thanks,
Qi

On Jun 26, 2019, at 10:50 PM, Stephan Ewen <[hidden email]> wrote:

Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Biao Liu
In reply to this post by Stephan Ewen
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Biao Liu
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Till Rohrmann
Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Till Rohrmann
Quick addition for problem (1): The AkkaRpcActor should serialize the response if it is a remote RPC and send an AkkaRpcException if the response's size exceeds the maximum frame size. This should be visible on the call site since the future should be completed with this exception. I'm wondering why you don't see this exception. 

It could be helpful to better understand the input splits your program is generating. Is it simply a `FileInputSplit` or did you implement a custom InputSplitAssigner with custom InputSplits?

Cheers,
Till

On Mon, Jul 1, 2019 at 11:57 AM Till Rohrmann <[hidden email]> wrote:
Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken



Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Ken Krugler
Hi Till,

Thanks for following up.

I’ve got answers to other emails on this thread pending, but wanted to respond to this one now.

On Jul 1, 2019, at 7:20 AM, Till Rohrmann <[hidden email]> wrote:

Quick addition for problem (1): The AkkaRpcActor should serialize the response if it is a remote RPC and send an AkkaRpcException if the response's size exceeds the maximum frame size. This should be visible on the call site since the future should be completed with this exception. I'm wondering why you don't see this exception. 

It could be helpful to better understand the input splits your program is generating. Is it simply a `FileInputSplit` or did you implement a custom InputSplitAssigner with custom InputSplits?

I’m reading from about 10K files stored in S3.

These are files created using Cascading, so it’s a Hadoop SequenceFile containing a (Cascading) Tuple for the key, and a Tuple for the value.

Removing some logic cruft, it looks like…

        Job job = Job.getInstance();
        job.getConfiguration().set("io.serializations", "cascading.tuple.hadoop.TupleSerialization");
        FileInputFormat.addInputPath(job, new Path(inputDir));

        

        HadoopInputFormat<Tuple, Tuple> inputFormat = HadoopInputs.createHadoopInput(new SequenceFileInputFormat<Tuple, Tuple>(), 
                Tuple.class, Tuple.class, job);

        Configuration parameters = new Configuration();
        parameters.setBoolean("recursive.file.enumeration", true);
        inputFormat.configure(parameters);

        DataSet<AdDaily> adDailies = env.createInput(inputFormat)
            .map(new CreateAdDaily())
            .name("ad dailies");

Thanks again,

— Ken




On Mon, Jul 1, 2019 at 11:57 AM Till Rohrmann <[hidden email]> wrote:
Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Ken Krugler
In reply to this post by Stephan Ewen
Hi Stephan,

Thanks for responding, comments inline below…

Regards,

— Ken

On Jun 26, 2019, at 7:50 AM, Stephan Ewen <[hidden email]> wrote:

Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

As per my email to Till, I don’t feel like I’m doing anything tricky, though I am reading Hadoop sequence files that contain Cascading Tuple/Tuple key/value data.

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?

No, or at least I’m not doing anything special to enable it.

Is there something I need to do to explicitly _disable_ it?

The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Next time it happens, I’ll dump the threads.

I should have done it this time, but was in a hurry to kill the EMR cluster as it had been costing money all night long :(



On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Till Rohrmann
Thanks for the update Ken. The input splits seem to be org.apache.hadoop.mapred.FileSplit. Nothing too fancy pops into my eye. Internally they use org.apache.hadoop.mapreduce.lib.input.FileSplit which stores a Path, two long pointers and two string arrays with hosts and host infos. I would assume that they are not exceeding the 10 MB framesize limit.

Once you see the problem happen again, it would also be helpful to save the logs.

Cheers,
Till

On Tue, Jul 2, 2019 at 2:21 AM Ken Krugler <[hidden email]> wrote:
Hi Stephan,

Thanks for responding, comments inline below…

Regards,

— Ken

On Jun 26, 2019, at 7:50 AM, Stephan Ewen <[hidden email]> wrote:

Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

As per my email to Till, I don’t feel like I’m doing anything tricky, though I am reading Hadoop sequence files that contain Cascading Tuple/Tuple key/value data.

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?

No, or at least I’m not doing anything special to enable it.

Is there something I need to do to explicitly _disable_ it?

The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Next time it happens, I’ll dump the threads.

I should have done it this time, but was in a hurry to kill the EMR cluster as it had been costing money all night long :(



On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Ken Krugler
In reply to this post by Till Rohrmann
Hi Till,

I tried out 1.9.0 with my workflow, and I no longer am running into the errors I described below, which is great!

Just to recap, this is batch, per-job mode on YARN/EMR.

Though I did run into a new issue, related to my previous problem when reading files written via SerializedOutputFormat.

I would always get errors that look like:

2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading from split #100 of file '<a href="s3://path-to-file/19" class="">s3://path-to-file/19' from 0 (state 28683/156308, block size 67108864)
2019-09-16 20:58:21,397 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN DataSource (at makePreparedDataSet(com.company.MyWorkflow.java:67) (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
java.io.UTFDataFormatException: malformed input around byte 51
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.company.AdText.read(AdText.java:170)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Which would imply (again) an issue with the read block size not being the same as what was used to write it.

But I’d run this same data through a different workflow, without any issues.

When I reduced the read parallelism of the failing workflow to match the succeeding workflow (was 12, dropped it to 4), the errors went away.

So…don’t know what’s the root issue, but I have a workaround for now.

Though it’s a reproducible problem, which I’d like to use to help solve the problem.

Any suggestions for how to debug further?

Thanks,

— Ken

 
On Jul 1, 2019, at 2:57 AM, Till Rohrmann <[hidden email]> wrote:

Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> Filter (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(MyWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(MyWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Till Rohrmann
Good to hear that some of your problems have been solved Ken. For the UTFDataFormatException it is hard to tell. Usually it says that the input has been produced using `writeUTF`. Cloud you maybe provide an example program which reproduces the problem? Moreover, it would be helpful to see how the input is generated and what AdText exactly does.

Cheers,
Till

On Wed, Sep 18, 2019 at 9:17 PM Ken Krugler <[hidden email]> wrote:
Hi Till,

I tried out 1.9.0 with my workflow, and I no longer am running into the errors I described below, which is great!

Just to recap, this is batch, per-job mode on YARN/EMR.

Though I did run into a new issue, related to my previous problem when reading files written via SerializedOutputFormat.

I would always get errors that look like:

2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading from split #100 of file 's3://path-to-file/19' from 0 (state 28683/156308, block size 67108864)
2019-09-16 20:58:21,397 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN DataSource (at makePreparedDataSet(com.company.MyWorkflow.java:67) (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
java.io.UTFDataFormatException: malformed input around byte 51
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.company.AdText.read(AdText.java:170)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Which would imply (again) an issue with the read block size not being the same as what was used to write it.

But I’d run this same data through a different workflow, without any issues.

When I reduced the read parallelism of the failing workflow to match the succeeding workflow (was 12, dropped it to 4), the errors went away.

So…don’t know what’s the root issue, but I have a workaround for now.

Though it’s a reproducible problem, which I’d like to use to help solve the problem.

Any suggestions for how to debug further?

Thanks,

— Ken

 
On Jul 1, 2019, at 2:57 AM, Till Rohrmann <[hidden email]> wrote:

Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> Filter (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(MyWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(MyWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Batch mode with Flink 1.8 unstable?

Fabian Hueske-2
Hi Ken,

Changing the parallelism can affect the generation of input splits.
I had a look at BinaryInputFormat, and it adds a bunch of empty input splits if the number of generated splits is less than the minimum number of splits (which is equal to the parallelism).


Maybe these empty splits cause the failure.
IIRC, this was done because there was at some point (like several years ago...) the requirement that each source task would receive a split.
I don't think this is still true. I'd try to remove these lines and see what happens.

If that doesn't help, I'd try to add a bunch of log statements in the InputFormat to identify the point where it fails.

Hope this helps,
Fabian


Am Do., 19. Sept. 2019 um 09:25 Uhr schrieb Till Rohrmann <[hidden email]>:
Good to hear that some of your problems have been solved Ken. For the UTFDataFormatException it is hard to tell. Usually it says that the input has been produced using `writeUTF`. Cloud you maybe provide an example program which reproduces the problem? Moreover, it would be helpful to see how the input is generated and what AdText exactly does.

Cheers,
Till

On Wed, Sep 18, 2019 at 9:17 PM Ken Krugler <[hidden email]> wrote:
Hi Till,

I tried out 1.9.0 with my workflow, and I no longer am running into the errors I described below, which is great!

Just to recap, this is batch, per-job mode on YARN/EMR.

Though I did run into a new issue, related to my previous problem when reading files written via SerializedOutputFormat.

I would always get errors that look like:

2019-09-16 20:58:21,396 ERROR com.company.MyWorkflow  - Exception reading from split #100 of file 's3://path-to-file/19' from 0 (state 28683/156308, block size 67108864)
2019-09-16 20:58:21,397 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN DataSource (at makePreparedDataSet(com.company.MyWorkflow.java:67) (com.company.MyWorkflow$AdTextInputFormat)) -> Map (Key Extractor) (4/12)
java.io.UTFDataFormatException: malformed input around byte 51
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.company.AdText.read(AdText.java:170)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:90)
at com.company.MyWorkflow$AdTextInputFormat.nextRecord(MyWorkflow.java:71)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

Which would imply (again) an issue with the read block size not being the same as what was used to write it.

But I’d run this same data through a different workflow, without any issues.

When I reduced the read parallelism of the failing workflow to match the succeeding workflow (was 12, dropped it to 4), the errors went away.

So…don’t know what’s the root issue, but I have a workaround for now.

Though it’s a reproducible problem, which I’d like to use to help solve the problem.

Any suggestions for how to debug further?

Thanks,

— Ken

 
On Jul 1, 2019, at 2:57 AM, Till Rohrmann <[hidden email]> wrote:

Hi Ken,

in order to further debug your problems it would be helpful if you could share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs too early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. The 1.8.1 release should be released very soonish. It would be great if you could try your program with this version or even the 1.8.1 RC to see whether the problem still occurs. But it could also be caused by using fine grained recovery. So it might be worth a try to disable this feature if you turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu <[hidden email]> wrote:
Hi Ken again,

In regard to TimeoutException, I just realized that there is no akka.remote.OversizedPayloadException in your log file. There might be some other reason caused this.
1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
2. Have you ever checked the garbage collection of JM/TM? Maybe you need to enable printing GC log first.


Biao Liu <[hidden email]> 于2019年6月27日周四 上午11:38写道:
Hi Ken,

In regard to oversized input splits, it seems to be a rare case beyond my expectation. However it should be fixed definitely since input split can be user-defined. We should not assume it must be small. 
I agree with Stephan that maybe there is something unexpectedly involved in the input splits.
And there is also a work-around way to solve this before we fixing it, increasing the limit of RPC message size by explicitly configuring "akka.framesize" in flink-conf.yaml.

To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm not sure I could catch up the releasing of 1.9. Hope things go well.


Stephan Ewen <[hidden email]> 于2019年6月26日周三 下午10:50写道:
Hi Ken!

Sorry to hear you are going through this experience. The major focus on streaming so far means that the DataSet API has stability issues at scale.
So, yes, batch mode in current Flink version can be somewhat tricky.

It is a big focus of Flink 1.9 to fix the batch mode, finally, and by addressing batch specific scheduling / recovery / and shuffle issues.

Let me go through the issues you found:

(1) Input splits and oversized RPC

Your explanation seems correct, timeout due to dropping oversized RPC message.

I don't quite understand how that exactly happens, because the size limit is 10 MB and input splits should be rather small in most cases.
Are you running custom sources which put large data into splits? Maybe accidentally, by having a large serialized closure in the splits?

The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399  

(2) TM early release

The 1.8 version had a fix that should work for regular cases without fine-grained failure recovery.
1.9 should have a more general fix that also works for fine-grained recovery

Are you trying to use the finer grained failover with the batch job?
The finer-grained failover is not working in batch for 1.8, that is why it is not an advertised feature (it only works for streaming so far).

The goal is that this works in the 1.9 release (aka the batch fixup release)

(3) Hang in Processing

I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
There are known issues with the current batch shuffle implementation, which is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Best,
Stephan






On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <[hidden email]> wrote:
Hi all,

I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 1.8.0, and it regularly fails, but for varying reasons.

Has anyone else had stability with 1.8.0 in batch mode and non-trivial workflows?

Thanks,

— Ken

1. TimeoutException getting input splits

The batch job starts by processing a lot of files that live in S3. During this phase, I sometimes see:

2019-06-20 01:20:22,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN DataSource (at createInput(ExecutionEnvironment.java:549) (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad dailies) -> Filter (Filter at createWorkflow(MyWorkflow.java:34)) -> Filter (Filter at createWorkflow(MyWorkflow.java:36)) -> Filter (Filter at createWorkflow(MyWorkflow.java:38)) -> Map (Key Extractor) -> Combine (Reduce at createWorkflow(MyWorkflow.java:41)) (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more
2019-06-20 01:20:22,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Could not retrieve next input split.
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
... 3 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 4 more

I saw [hidden email]’s email recently about a similar issue:

I figured this out myself. In my yarn container logs I saw this warning/error,

akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@HOST:43911/temp/$n]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.

Looking into this there is a max frame size for Akka which in flink can be set with akka.framesize and is set to 10MB by default. Increasing this past the size of my side input fixed the issue. I'm guessing this is due to creating the side input PCollection from memory using the Create.of APIs.

But no such akka.remote.OversizedPayloadException appears in any of my log files.

2. TM released too soon?

Sometimes it fails with "Connecting the channel failed: Connecting to remote task manager xxx has failed. This might indicate that the remote task manager has been lost”

I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix for https://issues.apache.org/jira/browse/FLINK-10941, and thus I’d avoid the problem, but it seems like there’s still an issue.

I’m running 3 TMs on three servers, each with 32 slots. When the job fails, the servers are under heavy CPU load.

From the logs, I see the JobManager releasing two of the TMs, then requesting two new containers. One of these requests gets filled, and that new TM starts getting tasks for its slots.

But then soon afterwards that new TM and the one original TM still left around start failing because they aren’t getting data from (I think) the other TM that was released.

Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or is there some TM timeout value that I should bump?

In the job manager log file I see where the two TMs are getting released...

2019-05-17 17:42:50,215 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing idle slot [d947cd800b0ef2671259c7b048c3f7fc].
2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000002.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager                     - Stopping container container_1558074033518_0003_01_000004.
2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the idle timeout.
2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000002 at (akka.tcp://[hidden email]:36311/user/taskmanager_0) because the framework did not recognize it
2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager                     - Discard registration from TaskExecutor container_1558074033518_0003_01_000004 at (akka.tcp://[hidden email]:44403/user/taskmanager_0) because the framework did not recognize it

And then later on the requests for the replacement TMs.

2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager                     - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 1.
2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:44000, vCores:32>. Number pending requests 2.

And then one of the requests is satisfied:

2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Received new container: container_1558074033518_0003_01_000006 - Remaining pending container requests: 2
2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager                     - Removing container request Capability[<memory:44000, vCores:32>]Priority[1]. Pending container requests 1.
2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager                     - Creating container launch context for TaskManagers
2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager                     - Starting TaskManagers

So it seems like TMs are being allocated, but soon afterwards:

2019-05-17 17:45:12,907 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Map at createWorkflow(MyWorkflow.java:127)) -> Map (Key Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317' has failed. This might indicate that the remote task manager has been lost.
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
	at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
	at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
	at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

On one of the TMs that was released, I see at the end of its log:

2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, networkMemoryInMB=2147483647}, allocationId: e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
2019-05-17 17:42:50,217 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
2019-05-17 17:42:50,222 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not registered.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
2019-05-17 17:43:38,982 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka.tcp://[hidden email]:35979/user/resourcemanager(00000000000000000000000000000000).
2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2019-05-17 17:43:38,988 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,989 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Shutting down TaskExecutorLocalStateStoresManager.
2019-05-17 17:43:38,990 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache                  - removed file cache directory /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
2019-05-17 17:43:39,004 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager was declined: unrecognized TaskExecutor
2019-05-17 17:43:39,012 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and re-attempting registration in 30000 ms

And in the replacement TM that was started, it fails with:

2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  Map (Key Extractor) (34/96)
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)

Where the TM it’s trying to connect to is the one that was released and hasn’t been restarted yet.

3. Hang in processing

Sometimes it finishes the long-running (10 hour) operator, and then the two downstream operators get stuck (these have a different parallelism, so there’s a rebalance)

In the most recent example of this, they processed about 20% of the data emitted by the long running operator. There are no errors in any of the logs. The last real activity in the jobmanager.log shows that all of the downstream operators were deployed...

2019-06-22 14:58:36,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map (Packed features) -> Map (Key Extractor) (7/32) (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.

Then nothing anywhere, until this msg starts appearing in the log file every 5 seconds or so…

2019-06-22 22:56:11,303 INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with new AMRMToken




--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra