Hi All, I created FLINK-12214 for adding JobListener (hook) in flink job lifecycle. Since this is a new public api for flink, so I'd like to discuss it more widely in community to get more feedback. The background and motivation is that I am integrating flink into apache zeppelin (which is a notebook in case you don't know). And I'd like to capture some job context (like jobId) in the lifecycle of flink job (submission, executed, cancelled) so that I can manipulate job in more fined grained control (e.g. I can capture the jobId when job is submitted, and then associate it with one paragraph, and when user click the cancel button, I can call the flink cancel api to cancel this job) I believe other projects which integrate flink would need similar mechanism. I plan to add api addJobListener in ExecutionEnvironment/StreamExecutionEnvironment so that user can add customized hook in flink job lifecycle. Here's draft interface JobListener. public interface JobListener { void onJobSubmitted(JobID jobId); void onJobExecuted(JobExecutionResult jobResult); void onJobCanceled(JobID jobId, String savepointPath); } Let me know your comment and concern, thanks. Best Regards Jeff Zhang |
Hi Jeff, I personally like this proposal. From the perspective of programmability, the JobListener can make the third program more appreciable. The scene where I need the listener is the Flink cube engine for Apache Kylin. In the case, the Flink job program is embedded into the Kylin's executable context. If we could have this listener, it would be easier to integrate with Kylin. Best, Vino Jeff Zhang <[hidden email]> 于2019年4月18日周四 下午1:30写道:
|
Thanks for starting this discussion Jeff. I can see the need for additional hooks for third party integrations. The thing I'm wondering is whether we really need/want to expose a JobListener via the ExecutionEnvironment. The ExecutionEnvironment is usually used by the user who writes the code and this person (I assume) would not be really interested in these callbacks. If he would, then one should rather think about a better programmatic job control where the `ExecutionEnvironment#execute` call returns a `JobClient` instance. Moreover, we would effectively make this part of the public API and every implementation would need to offer it. In your case, it could be sufficient to offer some hooks for the ClusterClient or being able to provide a custom ClusterClient. The ClusterClient is the component responsible for the job submission and retrieval of the job result and, hence, would be able to signal when a job has been submitted or completed. Cheers, Till On Thu, Apr 18, 2019 at 8:57 AM vino yang <[hidden email]> wrote:
|
>>> The ExecutionEnvironment is usually used by the user who writes the code and this person (I assume) would not be really interested in these callbacks. Usually ExecutionEnvironment is used by the user who write the code, but it doesn't needs to be created and configured by this person. e.g. in Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just use ExecutionEnvironment to write flink program. You are right that the end user would not be interested in these callback, but the third party library that integrate with zeppelin would be interested in these callbacks. >>> In your case, it could be sufficient to offer some hooks for the ClusterClient or being able to provide a custom ClusterClient. Actually in my initial PR (https://github.com/apache/flink/pull/8190), I do pass JobListener to ClusterClient and invoke it there. But IMHO, ClusterClient is not supposed be a public api for users. Instead JobClient is the public api that user should use to control job. So adding hooks to ClusterClient directly and provide a custom ClusterClient doesn't make sense to me. IIUC, you are suggesting the following approach env.getClusterClient().addJobListener(jobListener) but I don't see its benefit compared to this. env.addJobListener(jobListener) Overall, I think adding hooks is orthogonal with fine grained job control. And I agree that we should refactor the flink client component, but I don't think it would affect the JobListener interface. What do you think ? Till Rohrmann <[hidden email]> 于2019年4月18日周四 下午8:57写道:
Best Regards
Jeff Zhang |
I think we should not expose the ClusterClient configuration via the ExecutionEnvironment (env.getClusterClient().addJobListener) because this is effectively the same as exposing the JobListener interface directly on the ExecutionEnvironment. Instead I think it could be possible to provide a ClusterClient factory which is picked up from the Configuration or some other mechanism for example. That way it would not need to be exposed via the ExecutionEnvironment at all. Cheers, Till On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang <[hidden email]> wrote:
|
Hi Till, IMHO, allow adding hooks involves 2 steps. 1. Provide hook interface, and call these hook in flink (ClusterClient) at the right place. This should be done by framework (flink) 2. Implement new hook implementation and add/register them into framework(flink) What I am doing is step 1 which should be done by flink, step 2 is done by users. But IIUC, your suggestion of using custom ClusterClient seems mixing these 2 steps together. Say I'd like to add new hooks, I have to implement a new custom ClusterClient, add new hooks and call them in the custom ClusterClient at the right place. This doesn't make sense to me. For a user who want to add hooks, he is not supposed to understand the mechanism of ClusterClient, and should not touch ClusterClient. What do you think ? Till Rohrmann <[hidden email]> 于2019年4月23日周二 下午4:24写道:
Best Regards
Jeff Zhang |
Thanks for the proposal, Jeff. Adding a listener to allow users handle events during the job lifecycle makes a lot of sense to me. Here are my two cents. * How do user specify the listener? * It is not quite clear to me whether we consider ClusterClient as a public interface? From what I understand ClusterClient is not a public interface right now. In contrast, ExecutionEnvironment is the de facto interface for administrative work. After job submission, it is essentially bound to a job as an administrative handle. Given this current state, personally I feel acceptable to have the listener registered to the ExecutionEnvironment. * Where should the listener run? * If the listener runs on the client side, the client have to be always connected to the Flink cluster. This does not quite work if the Job is a streaming job. Should we provide the option to run the listener in JobMaster as well? * What should be reported to the Listener? * Besides the proposed APIs, does it make sense to also report events such as failover? * What can the listeners do on notifications? * If the listeners are expected to do anything on the job, should some helper class to manipulate the jobs be passed to the listener method? Otherwise users may not be able to easily take action. Thanks, Jiangjie (Becket) Qin On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang <[hidden email]> wrote:
|
Hi Beckett, Thanks for your feedback, See my comments inline >>> How do user specify the listener? * What I proposal is to register JobListener in ExecutionEnvironment. I don't think we should make ClusterClient as public api. >>> Where should the listener run? * I don't think it is proper to run listener in JobMaster. The listener is user code, and usually it is depends on user's other component. So running it in client side make more sense to me. >>> What should be reported to the Listener? * I am open to add other api in this JobListener. But for now, I am afraid the ExecutionEnvironment is not aware of failover, so it is not possible to report failover event. >>> What can the listeners do on notifications? * Do you mean to pass JobGraph to these methods ? like following ( I am afraid JobGraph is not a public and stable api, we should not expose it to users) public interface JobListener { void onJobSubmitted(JobGraph graph, JobID jobId); void onJobExecuted(JobGraph graph, JobExecutionResult jobResult); void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath); } Becket Qin <[hidden email]> 于2019年4月25日周四 下午7:40写道:
Best Regards
Jeff Zhang |
Hi everybody, any news on this? For us would be VERY helpful to have such a feature because we need to execute a call to a REST service once a job ends. Right now we do this after the env.execute() but this works only if the job is submitted via the CLI client, the REST client doesn't execute anything after env.execute(). Best, Flavio On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang <[hidden email]> wrote:
|
This issue is another case where we have problems figuring out the
boundaries and responsibilities between the ExecutionEnvironments and the ClusterClient. I believe we should figure this out first, and decide whether the ClusterClient (or anything based on it) should be made public to accomodate use-cases such as this. Personally, I believe the environments to be overloaded as is and would very much not want more features to be added to them. On 09/05/2019 10:13, Flavio Pompermaier wrote: > Hi everybody, > any news on this? For us would be VERY helpful to have such a feature > because we need to execute a call to a REST service once a job ends. > Right now we do this after the env.execute() but this works only if the job > is submitted via the CLI client, the REST client doesn't execute anything > after env.execute(). > > Best, > Flavio > > > > > On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang <[hidden email]> wrote: > >> Hi Beckett, >> >> Thanks for your feedback, See my comments inline >> >>>>> How do user specify the listener? * >> What I proposal is to register JobListener in ExecutionEnvironment. I >> don't think we should make ClusterClient as public api. >> >>>>> Where should the listener run? * >> I don't think it is proper to run listener in JobMaster. The listener is >> user code, and usually it is depends on user's other component. So running >> it in client side make more sense to me. >> >>>>> What should be reported to the Listener? * >> I am open to add other api in this JobListener. But for now, I am afraid >> the ExecutionEnvironment is not aware of failover, so it is not possible to >> report failover event. >> >>>>> What can the listeners do on notifications? * >> Do you mean to pass JobGraph to these methods ? like following ( I am >> afraid JobGraph is not a public and stable api, we should not expose it to >> users) >> >> public interface JobListener { >> >> void onJobSubmitted(JobGraph graph, JobID jobId); >> >> void onJobExecuted(JobGraph graph, JobExecutionResult jobResult); >> >> void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath); >> } >> >> >> Becket Qin <[hidden email]> 于2019年4月25日周四 下午7:40写道: >> >>> Thanks for the proposal, Jeff. Adding a listener to allow users handle >>> events during the job lifecycle makes a lot of sense to me. >>> >>> Here are my two cents. >>> >>> * How do user specify the listener? * >>> It is not quite clear to me whether we consider ClusterClient as a public >>> interface? From what I understand ClusterClient is not a public interface >>> right now. In contrast, ExecutionEnvironment is the de facto interface for >>> administrative work. After job submission, it is essentially bound to a job >>> as an administrative handle. Given this current state, personally I feel >>> acceptable to have the listener registered to the ExecutionEnvironment. >>> >>> * Where should the listener run? * >>> If the listener runs on the client side, the client have to be always >>> connected to the Flink cluster. This does not quite work if the Job is a >>> streaming job. Should we provide the option to run the listener in >>> JobMaster as well? >>> >>> * What should be reported to the Listener? * >>> Besides the proposed APIs, does it make sense to also report events such >>> as failover? >>> >>> * What can the listeners do on notifications? * >>> If the listeners are expected to do anything on the job, should some >>> helper class to manipulate the jobs be passed to the listener method? >>> Otherwise users may not be able to easily take action. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> >>> >>> >>> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang <[hidden email]> wrote: >>> >>>> Hi Till, >>>> >>>> IMHO, allow adding hooks involves 2 steps. >>>> 1. Provide hook interface, and call these hook in flink (ClusterClient) >>>> at the right place. This should be done by framework (flink) >>>> 2. Implement new hook implementation and add/register them into >>>> framework(flink) >>>> >>>> What I am doing is step 1 which should be done by flink, step 2 is done >>>> by users. But IIUC, your suggestion of using custom ClusterClient seems >>>> mixing these 2 steps together. Say I'd like to add new hooks, I have to >>>> implement a new custom ClusterClient, add new hooks and call them in the >>>> custom ClusterClient at the right place. >>>> This doesn't make sense to me. For a user who want to add hooks, he is >>>> not supposed to understand the mechanism of ClusterClient, and should not >>>> touch ClusterClient. What do you think ? >>>> >>>> >>>> >>>> >>>> Till Rohrmann <[hidden email]> 于2019年4月23日周二 下午4:24写道: >>>> >>>>> I think we should not expose the ClusterClient configuration via the >>>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this >>>>> is effectively the same as exposing the JobListener interface directly on >>>>> the ExecutionEnvironment. Instead I think it could be possible to provide a >>>>> ClusterClient factory which is picked up from the Configuration or some >>>>> other mechanism for example. That way it would not need to be exposed via >>>>> the ExecutionEnvironment at all. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang <[hidden email]> wrote: >>>>> >>>>>>>>> The ExecutionEnvironment is usually used by the user who writes >>>>>> the code and this person (I assume) would not be really interested in these >>>>>> callbacks. >>>>>> >>>>>> Usually ExecutionEnvironment is used by the user who write the code, >>>>>> but it doesn't needs to be created and configured by this person. e.g. in >>>>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just >>>>>> use ExecutionEnvironment to write flink program. You are right that the >>>>>> end user would not be interested in these callback, but the third party >>>>>> library that integrate with zeppelin would be interested in these callbacks. >>>>>> >>>>>>>>> In your case, it could be sufficient to offer some hooks for the >>>>>> ClusterClient or being able to provide a custom ClusterClient. >>>>>> >>>>>> Actually in my initial PR (https://github.com/apache/flink/pull/8190), >>>>>> I do pass JobListener to ClusterClient and invoke it there. >>>>>> But IMHO, ClusterClient is not supposed be a public api for users. >>>>>> Instead JobClient is the public api that user should use to control job. So >>>>>> adding hooks to ClusterClient directly and provide a custom ClusterClient >>>>>> doesn't make sense to me. IIUC, you are suggesting the following approach >>>>>> env.getClusterClient().addJobListener(jobListener) >>>>>> but I don't see its benefit compared to this. >>>>>> env.addJobListener(jobListener) >>>>>> >>>>>> Overall, I think adding hooks is orthogonal with fine grained job >>>>>> control. And I agree that we should refactor the flink client component, >>>>>> but I don't think it would affect the JobListener interface. What do you >>>>>> think ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Till Rohrmann <[hidden email]> 于2019年4月18日周四 下午8:57写道: >>>>>> >>>>>>> Thanks for starting this discussion Jeff. I can see the need for >>>>>>> additional hooks for third party integrations. >>>>>>> >>>>>>> The thing I'm wondering is whether we really need/want to expose a >>>>>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is >>>>>>> usually used by the user who writes the code and this person (I assume) >>>>>>> would not be really interested in these callbacks. If he would, then one >>>>>>> should rather think about a better programmatic job control where the >>>>>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance. >>>>>>> Moreover, we would effectively make this part of the public API and every >>>>>>> implementation would need to offer it. >>>>>>> >>>>>>> In your case, it could be sufficient to offer some hooks for the >>>>>>> ClusterClient or being able to provide a custom ClusterClient. The >>>>>>> ClusterClient is the component responsible for the job submission and >>>>>>> retrieval of the job result and, hence, would be able to signal when a job >>>>>>> has been submitted or completed. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang <[hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jeff, >>>>>>>> >>>>>>>> I personally like this proposal. From the perspective of >>>>>>>> programmability, the JobListener can make the third program more >>>>>>>> appreciable. >>>>>>>> >>>>>>>> The scene where I need the listener is the Flink cube engine for >>>>>>>> Apache Kylin. In the case, the Flink job program is embedded into the >>>>>>>> Kylin's executable context. >>>>>>>> >>>>>>>> If we could have this listener, it would be easier to integrate with >>>>>>>> Kylin. >>>>>>>> >>>>>>>> Best, >>>>>>>> Vino >>>>>>>> >>>>>>>> Jeff Zhang <[hidden email]> 于2019年4月18日周四 下午1:30写道: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I created FLINK-12214 >>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-12214> for adding >>>>>>>>> JobListener (hook) in flink job lifecycle. Since this is a new public api >>>>>>>>> for flink, so I'd like to discuss it more widely in community to get more >>>>>>>>> feedback. >>>>>>>>> >>>>>>>>> The background and motivation is that I am integrating flink into apache >>>>>>>>> zeppelin <http://zeppelin.apache.org/>(which is a notebook in case >>>>>>>>> you don't know). And I'd like to capture some job context (like jobId) in >>>>>>>>> the lifecycle of flink job (submission, executed, cancelled) so that I can >>>>>>>>> manipulate job in more fined grained control (e.g. I can capture the jobId >>>>>>>>> when job is submitted, and then associate it with one paragraph, and when >>>>>>>>> user click the cancel button, I can call the flink cancel api to cancel >>>>>>>>> this job) >>>>>>>>> >>>>>>>>> I believe other projects which integrate flink would need similar >>>>>>>>> mechanism. I plan to add api addJobListener in >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add >>>>>>>>> customized hook in flink job lifecycle. >>>>>>>>> >>>>>>>>> Here's draft interface JobListener. >>>>>>>>> >>>>>>>>> public interface JobListener { >>>>>>>>> >>>>>>>>> void onJobSubmitted(JobID jobId); >>>>>>>>> >>>>>>>>> void onJobExecuted(JobExecutionResult jobResult); >>>>>>>>> >>>>>>>>> void onJobCanceled(JobID jobId, String savepointPath); >>>>>>>>> } >>>>>>>>> >>>>>>>>> Let me know your comment and concern, thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >> -- >> Best Regards >> >> Jeff Zhang >> |
Reviving this thread again after I came across FLINK-12214 [1] since there
are use cases which might benefit from this feature. Was there some conclusion on public APIs in the meantime? Should we proceed with the discussion here? Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-12214 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi everybody, I was trying to use the JobListener in my job but onJobExecuted() on Flink 1.11.0 but I can't understand if the job succeeded or not. If I look at the Javadoc of the JobListener.onJobExecute() [1] says "Callback on job execution finished, successfully or unsuccessfully"but I can't find any simple way to infer if the job has finished successfully or not. Do I need to perform another remote call from the client to get the job details using the job id? I'm quite surprised that the execution result (FINISHED / CANCELED / FAILED) in not available in the JobExecutionResult. Another strange thing is that the jobExecutionResult.getJobExecutionResult() returns itself..is it correct? Thanks in advance, On Fri, Oct 9, 2020 at 1:09 PM Matthias <[hidden email]> wrote: Reviving this thread again after I came across FLINK-12214 [1] since there Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
Hi Flavio,
Coould this https://issues.apache.org/jira/browse/FLINK-20020 help? Cheers, Kostas On Thu, Nov 5, 2020 at 9:39 PM Flavio Pompermaier <[hidden email]> wrote: > > Hi everybody, > I was trying to use the JobListener in my job but onJobExecuted() on Flink 1.11.0 but I can't understand if the job succeeded or not. > If I look at the Javadoc of the JobListener.onJobExecute() [1] says "Callback on job execution finished, successfully or unsuccessfully" > but I can't find any simple way to infer if the job has finished successfully or not. > Do I need to perform another remote call from the client to get the job details using the job id? > I'm quite surprised that the execution result (FINISHED / CANCELED / FAILED) in not available in the JobExecutionResult. > Another strange thing is that the jobExecutionResult.getJobExecutionResult() returns itself..is it correct? > > Thanks in advance, > Flavio > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html > > On Fri, Oct 9, 2020 at 1:09 PM Matthias <[hidden email]> wrote: >> >> Reviving this thread again after I came across FLINK-12214 [1] since there >> are use cases which might benefit from this feature. Was there some >> conclusion on public APIs in the meantime? Should we proceed with the >> discussion here? >> >> Best, >> Matthias >> >> [1] https://issues.apache.org/jira/browse/FLINK-12214 >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 041809 |
I think it's ok.. I suggest also to add JobStatus to onJobExecuted() so you can immediately know if the job finished successfully or if it is was failed or canceled. Thanks for the help, Flavio On Fri, Nov 6, 2020 at 10:41 AM Kostas Kloudas <[hidden email]> wrote: Hi Flavio, |
Free forum by Nabble | Edit this page |