Job Manager taking long time to upload job graph on remote storage

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Job Manager taking long time to upload job graph on remote storage

Prakhar Mathur
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Till Rohrmann
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Prakhar Mathur
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Till Rohrmann
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Prakhar Mathur
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Till Rohrmann
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <[hidden email]> wrote:
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Prakhar Mathur
Yes, I will check that, but any pointers on why Flink is taking more time than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <[hidden email]> wrote:
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <[hidden email]> wrote:
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Till Rohrmann
From the log snippet it is hard to tell. Flink is not only interacting with GCS but also with ZooKeeper to store a pointer to the serialized JobGraph. This can also take some time. Then of course, there could be an issue with the GS filesystem implementation you are using. The fs throughput could also change a bit with time. In the logs you shared, uploading of the blobs takes at most 7s. In another run you stated that it would take 10s.

Have you tried whether the same behaviour is observable with the latest Flink version?

Cheers,
Till

On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <[hidden email]> wrote:
Yes, I will check that, but any pointers on why Flink is taking more time than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <[hidden email]> wrote:
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <[hidden email]> wrote:
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Prakhar Mathur
Yes, we can try the same in 1.11. Meanwhile is there any network or threads related config that we can tweak for this?

On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann <[hidden email]> wrote:
From the log snippet it is hard to tell. Flink is not only interacting with GCS but also with ZooKeeper to store a pointer to the serialized JobGraph. This can also take some time. Then of course, there could be an issue with the GS filesystem implementation you are using. The fs throughput could also change a bit with time. In the logs you shared, uploading of the blobs takes at most 7s. In another run you stated that it would take 10s.

Have you tried whether the same behaviour is observable with the latest Flink version?

Cheers,
Till

On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <[hidden email]> wrote:
Yes, I will check that, but any pointers on why Flink is taking more time than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <[hidden email]> wrote:
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <[hidden email]> wrote:
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur
Reply | Threaded
Open this post in threaded view
|

Re: Job Manager taking long time to upload job graph on remote storage

Till Rohrmann
I am not sure at this point that the delay is caused by Flink. I would rather suspect that it has something to do with an external system. Maybe you could try profiling the job submission so that we see clearer where the time is spent. Other than that, there might be some options for the GCS filesystem implementation one might take a look at.

Cheers,
Till

On Fri, Sep 4, 2020 at 12:24 PM Prakhar Mathur <[hidden email]> wrote:
Yes, we can try the same in 1.11. Meanwhile is there any network or threads related config that we can tweak for this?

On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann <[hidden email]> wrote:
From the log snippet it is hard to tell. Flink is not only interacting with GCS but also with ZooKeeper to store a pointer to the serialized JobGraph. This can also take some time. Then of course, there could be an issue with the GS filesystem implementation you are using. The fs throughput could also change a bit with time. In the logs you shared, uploading of the blobs takes at most 7s. In another run you stated that it would take 10s.

Have you tried whether the same behaviour is observable with the latest Flink version?

Cheers,
Till

On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <[hidden email]> wrote:
Yes, I will check that, but any pointers on why Flink is taking more time than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <[hidden email]> wrote:
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <[hidden email]> wrote:
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <[hidden email]> wrote:
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <[hidden email]> wrote:
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout.
2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout.
2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <[hidden email]> wrote:
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long.

To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it.

What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <[hidden email]> wrote:
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration?

Thank you. Regards
Prakhar Mathur