|
Hi till, thanks for the feedback and suggestion.
I think it make senses to only support flink-dist-*.jar at the first step. Just as your suggestion, the config option could be "yarn.submission.automatic-flink-dist-upload", default is true. Users could use "-yt/--yarnship" to specify a HDFS path that contains flink-dist-*.jar and set the above config option to "false" to disable flink-dist-*.jar uploading.
Best, Yang Thanks for the clarification Yang. Now it makes sense to me.
If it makes things easier, then I would still go first with the dead simple solution to turn automatic upload of local dist off via a configuration option before trying to implement a smart solution which relies on pattern matching or something else. For example, users might specify a remote location which is not accessible from the client. Then one could not figure out which files are already uploaded. The smart solution could be a follow up step then.
Cheers, Till
Hi till,
Sorry for that i do not giving a detailed explanation of the optimization. Actually, the optimization contains the following two parts. * Use remote uploaded jars to avoid unnecessary uploading(e.g. flink-dist-*.jar, user jars, dependencies). this could be done via enriching "-yt/--yarnship" to support remote ship files. * Use the "PUBLIC" or "PRIVATE" visibility of YARN local resource to avoid unnecessary downloading. When a local resource is public, once it is download by YARN NodeManager, it could be reused by all the application in the same NodeManager.
>> Why do we need to specify the visibility of the remote files? Won't the visibility be specified when uploading these files? It is mostly for the users who want to eliminate the unnecessary downloading so that the container could be launched faster. "PRIVATE" means the remote jars could be shared by the applications submitted by the current user. "PUBLIC" means the remote jars could be shared by all the Flink applications. And "APPLICATION" means they could only be shared by the containers of the current application in a same NodeManager.
For the implementation, i think we could do it step by step. * Enrich "-yt/--yarnship" to support HDFS directory
* Add a new config option to control whether to avoid the unnecessary uploading
* Enrich "-yt/--yarnship" to specify local resource visibility
Best, Yang
Shall we say for the first version we only can deactivate the upload of local files instead of doing some optimizations? I guess my problem is that I don't fully understand the optimizations yet. Maybe we introduce a power user config option `yarn.submission.automatic-flink-dist-upload` or so.
Why do we need to specify the visibility of the remote files? Won't the visibility be specified when uploading these files?
Apart from that, the proposal looks good to me.
Cheers, Till
Hi tison,
I think i get your concerns and points.
Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in the following steps. * Enrich "-yt/--yarnship" to support HDFS directory * Enrich "-yt/--yarnship" to specify local resource visibility. It is "APPLICATION" by default. It could be also configured to "PUBLIC", which means shared by all applications, or "PRIVATE" which means shared by a same user. * Add a new config option to control whether to optimize the submission(default is false). When configured to true, Flink client will try to filter the jars and files by name and size to avoid unnecessary uploading.
A very rough submission command could be issued as following. ./bin/flink run -m yarn-cluster -d -yt hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib \ -yD yarn.submission-optimization.enable=true examples/streaming/WindowJoin.jar
cc [hidden email], since you also help to review the old PR of FLINK-13938, maybe you could also share some thoughts.
Best, Yang
Hi Yang,
Name filtering & schema special handling makes sense for me. We can enrich later if there is requirement without breaking interface.
For #1, from my perspective your first proposal is
having an option specifies remote flink/lib, then we turn off auto uploading local flink/lib and register that path as local resources
It seems we here add another special logic for handling one kind of things...what I propose is we do these two steps explicitly separated:
1. an option turns off auto uploading local flink/lib 2. a general option register remote files as local resources
The rest thing here is that you propose we handle flink/lib as PUBLIC visibility while other files as APPLICATION visibility, whether a composite configuration or name filtering to special handle libs makes sense though.
YarnClusterDescriptor already has a lot of special handling logics which introduce a number of config options and keys, which should have been configured in few of common options and validated at the runtime.
Hi tison,
For #3, if you mean registering remote HDFS file as local resource, we should make the "-yt/--yarnship" to support remote directory. I think it is the right direction.
For #1, if the users could ship remote directory, then they could also specify like this "-yt hdfs://hdpdev/flink/release/flink-1.x, hdfs://hdpdev/user/someone/mylib". Do you mean we add an option for whether trying to avoid unnecessary uploading? Maybe we could filter by names and file size. I think this is a good suggestion, and we do not need to introduce a new config option "-ypl".
For #2, for flink-dist, the #1 could already solve the problem. We do not need to support remote schema. It will confuse the users when we only support HDFS, not S3, OSS, etc.
Best, Yang
Hi Yang,
I agree that these two of works would benefit from single assignee. My concern is as below
1. Both share libs & remote flink dist/libs are remote ship files. I don't think we have to implement multiple codepath/configuration. 2. So, for concept clarification, there are (1) an option to disable shipping local libs (2) flink-dist supports multiple schema at least we said "hdfs://" (3) an option for registering remote shipfiles with path & visibility. I think new configuration system helps.
the reason we have to special handling (2) instead of including it in (3) is because when shipping flink-dist to TM container, we specially detect flink-dist. Of course we can merge it into general ship files and validate shipfiles finally contain flink-dist, which is an alternative.
The *most important* difference is (1) and (3) which we don't have an option for only remote libs. Is this clarification satisfy your proposal?
Hi Yang,
from what I understand it sounds reasonable to me. Could you sync with Tison on FLINK-14964 on how to proceed. I'm not super deep into these issues but they seem to be somewhat related and Tison already did some implementation work.
I'd say it be awesome if we could include this kind of improvement into the release.
Cheers, Till
Hi All, thanks a lot for reviving this discussion.
I think we could unify the FLINK-13938 and FLINK-14964 since they have the similar purpose, avoid unnecessary uploading and downloading jars in YARN deployment. The difference is FLINK-13938 aims to support the flink system lib directory only, while FLINK-14964 is trying to support arbitrary pre-uloaded jars(including user and system jars).
So i suggest to do this feature as following. 1. Upload the flink lib directory or users to hdfs, e.g. "hdfs://hdpdev/flink/release/flink-1.x" "hdfs://hdpdev/user/someone/mylib" 2. Use the -ypl argument to specify the shared lib, multiple directories could be specified 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid unnecessary uploading, both for system and user jars 4. YarnClusterDescriptor needs to set the system jars to public visibility so that the distributed cache in the YARN nodemanager could be reused by multiple applications. This is to avoid unnecessary downloading, especially for the "flink-dist-*.jar". For the user shared lib, the visibility is still set to "APPLICATION" level.
For our past internal use case, the shared lib could help with accelerating the submission a lot. Also it helps to reduce the pressure of HDFS when we want to launch many applications together.
will try to find some time to work on this and hope it could catch up with release-1.1 cycle.
Best, Yang
Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till.
//
ah
From: Till Rohrmann <[hidden email]>
Sent: Wednesday, April 15, 2020 10:51 AM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: Yang Wang <[hidden email]>; tison <[hidden email]>; [hidden email]
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question
Hi Andreas,
it looks as if FLINK-13938 and FLINK-14964 won't make it into the 1.10.1 release because the community is about to start the release process. Since FLINK-13938 is a new feature it will be shipped with a major release. There is still a bit
of time until the 1.11 feature freeze and if Yang Wang has time to finish this PR, then we could ship it.
On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] <[hidden email]> wrote:
Yang, Tison,
Do we know when some solution for 13938 and 14964 will arrive? Do you think it will be in a 1.10.x version?
Hi Yang,
This is good to know. As a stopgap measure until a solution between 13938 and 14964 arrives, we can automate
the application staging directory cleanup from our client should the process fail. It’s not ideal, but will at least begin to manage our users’ quota. I’ll continue to watch the two tickets. Thank you.
//
ah
From: Yang Wang <[hidden email]>
Sent: Monday, March 16, 2020 9:37 PM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: tison <[hidden email]>;
[hidden email]
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question
Hi Hailu,
Sorry for the late response. If the Flink cluster(e.g. Yarn application) is stopped directly
by `yarn application -kill`, then the staging directory will be left behind. Since the jobmanager
do not have any change to clean up the staging directly. Also it may happen when the
jobmanager crashed and reached the attempts limit of Yarn.
For FLINK-13938, yes, it is trying to use the Yarn public cache to accelerate the container
Also may I ask what causes these application ID directories to be left behind? Is it a job failure, or
can they persist even if the application succeeds? I’d like to know so that I can implement my own cleanup in the interim to prevent exceeding user disk space quotas.
Hi Yang,
Yes, a combination of these two would
be very helpful for us. We have a single shaded binary which we use to run all of the jobs on our YARN cluster. If we could designate a single location in HDFS for that as well, we could also greatly benefit from FLINK-13938.
It sounds like a general public cache
solution is what’s being called for?
//
ah
From:
Yang Wang <[hidden email]>
Sent: Sunday, March 8, 2020 10:52 PM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: tison <[hidden email]>;
[hidden email]
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question
Hi Hailu, tison,
I created a very similar ticket before to accelerate Flink submission on Yarn[1]. However,
we do not get a consensus in the PR. Maybe it's time to revive the discussion and try
to find a common solution for both the two tickets[1][2].
Hi Tison, thanks for the reply. I’ve
replied to the ticket. I’ll be watching it as well.
//
ah
From: tison <[hidden email]>
Sent: Friday, March 6, 2020 1:40 PM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: [hidden email]
Subject: Re: Flink Conf "yarn.flink-dist-jar" Question
FLINK-13938 seems a bit different than your requirement. The one totally matches is FLINK-14964.
I'll appreciate it if you can share you opinion on the JIRA ticket.
Yes your requirement is exactly taken into consideration by the community. We currently have an open JIRA ticket for the specific feature[1]
and works for loosing the constraint of flink-jar schema to support DFS location should happen.
Hi,
We noticed that every time an application runs, it uploads the flink-dist artifact to the /user/<user>/.flink HDFS
directory. This causes a user disk space quota issue as we submit thousands of apps to our cluster an hour. We had a similar problem with our Spark applications where it uploaded the Spark Assembly package for every app. Spark provides an argument to use a
location in HDFS its for applications to leverage so they don’t need to upload them for every run, and that was our solution (see “spark.yarn.jar” configuration if interested.)
Looking at the
Resource Orchestration Frameworks page, I see there’s might be a similar concept through a “yarn.flink-dist-jar” configuration option. I wanted to place the flink-dist package we’re using in a location in HDFS and configure out jobs to point to it, e.g.
yarn.flink-dist-jar: hdfs:////user/delp/.flink/flink-dist_2.11-1.9.1.jar
Am I correct in that this is what I’m looking for? I gave this a try with some jobs today, and based on what I’m seeing
in the launch_container.sh in our YARN application, it still looks like it’s being uploaded:
export _FLINK_JAR_PATH="hdfs://d279536/user/delp/.flink/application_1583031705852_117863/flink-dist_2.11-1.9.1.jar"
How can I confirm? Or is this perhaps not config I’m looking for?
Best,
Andreas
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information,
your rights and who you can contact, please refer to:
www.gs.com/privacy-notices
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information,
your rights and who you can contact, please refer to:
www.gs.com/privacy-notices
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information,
your rights and who you can contact, please refer to:
www.gs.com/privacy-notices
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information,
your rights and who you can contact, please refer to:
www.gs.com/privacy-notices
Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information,
your rights and who you can contact, please refer to:
www.gs.com/privacy-notices
|