Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal? Best, Flavio |
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient.. Am I doing something wrong? My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment(). Thanks in advance for any help, Flavio On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
|
Hi Flavio, thanks for sharing this with the Flink community. Could you answer the following questions, please: - What's the code of your Job's main method? - What cluster backend and application do you use to execute the job? - Is there anything suspicious you can find in the logs that might be related? Best, Matthias On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
|
see inline Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha scritto:
it's actually very simple...the main class creates a batch execution env using ExecutionEnvironment.getExecutionEnvironment(), I register a job listener to the env and I do some stuff before calling env.execute(). The listener is executed correctly but if I use the RestClusterClient to sibmit the jobGraph exyracted from that main contained in a jar, the program is executed as usual but the job listener is not called.
I use a standalone session cluster for the moment
no unfortunately..
|
I've also verified that the problem persist also using a modified version of the WordCount class. If you add the code pasted at the end of this email at the end of its main method you can verify that the listener is called if you run the program from the IDE, but it's not called if you submit the job using the CLI client using the command
Maybe this is an expected result but I didn't find any documentation of this behaviour (neither in the Javadoc or in the flink web site, where I can't find any documentation about JobListener at all). [Code to add to main()] // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.registerJobListener(new JobListener() { @Override public void onJobSubmitted(JobClient arg0, Throwable arg1) { System.out.println("**************** SUBMITTED"); } @Override public void onJobExecuted(JobExecutionResult arg0, Throwable arg1) { System.out.println("**************** EXECUTED"); } }); env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]> wrote:
|
is this a bug or is it a documentation problem...? Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha scritto:
|
Hi Flavio, I think I can reproduce what you are reporting (assuming you also pass '--output' to 'flink run'). I am not sure why it behaves like this. I would suggest filing a Jira ticket for this. Best, Andrey On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]> wrote:
|
@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job via "bin/flink run ...", right? What's the exact invocation of "bin/flink run" that you're using? On 19.11.20 09:29, Andrey Zagrebin wrote: > Hi Flavio, > > I think I can reproduce what you are reporting (assuming you also pass > '--output' to 'flink run'). > I am not sure why it behaves like this. I would suggest filing a Jira > ticket for this. > > Best, > Andrey > > On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]> > wrote: > >> is this a bug or is it a documentation problem...? >> >> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha >> scritto: >> >>> I've also verified that the problem persist also using a modified version >>> of the WordCount class. >>> If you add the code pasted at the end of this email at the end of its >>> main method you can verify that the listener is called if you run the >>> program from the IDE, but it's not called if you submit the job using the >>> CLI client using the command >>> >>> - bin/flink run >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >>> >>> Maybe this is an expected result but I didn't find any documentation of >>> this behaviour (neither in the Javadoc or in the flink web site, where I >>> can't find any documentation about JobListener at all). >>> >>> [Code to add to main()] >>> // emit result >>> if (params.has("output")) { >>> counts.writeAsCsv(params.get("output"), "\n", " "); >>> // execute program >>> env.registerJobListener(new JobListener() { >>> >>> @Override >>> public void onJobSubmitted(JobClient arg0, Throwable arg1) { >>> System.out.println("**************** SUBMITTED"); >>> } >>> >>> @Override >>> public void onJobExecuted(JobExecutionResult arg0, Throwable >>> arg1) { >>> System.out.println("**************** EXECUTED"); >>> } >>> }); >>> env.execute("WordCount Example"); >>> } else { >>> System.out.println("Printing result to stdout. Use --output to >>> specify output path."); >>> counts.print(); >>> } >>> >>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]> >>> wrote: >>> >>>> see inline >>>> >>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha >>>> scritto: >>>> >>>>> Hi Flavio, >>>>> thanks for sharing this with the Flink community. Could you answer the >>>>> following questions, please: >>>>> - What's the code of your Job's main method? >>>>> >>>> >>>> it's actually very simple...the main class creates a batch execution env >>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job >>>> listener to the env and I do some stuff before calling env.execute(). >>>> The listener is executed correctly but if I use the RestClusterClient to >>>> sibmit the jobGraph exyracted from that main contained in a jar, the >>>> program is executed as usual but the job listener is not called. >>>> >>>> - What cluster backend and application do you use to execute the job? >>>>> >>>> >>>> I use a standalone session cluster for the moment >>>> >>>> - Is there anything suspicious you can find in the logs that might be >>>>> related? >>>>> >>>> >>>> no unfortunately.. >>>> >>>> >>>>> Best, >>>>> Matthias >>>>> >>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >>>>> [hidden email]> wrote: >>>>> >>>>>> Actually what I'm experiencing is that the JobListener is executed >>>>>> successfully if I run my main class from the IDE, while the job listener is >>>>>> not fired at all if I submit the JobGraph of the application to a cluster >>>>>> using the RestClusterClient.. >>>>>> Am I doing something wrong? >>>>>> >>>>>> My main class ends with the env.execute() and i do >>>>>> env.registerJobListener() when I create the Exceution env >>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >>>>>> >>>>>> Thanks in advance for any help, >>>>>> Flavio >>>>>> >>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >>>>>> [hidden email]> wrote: >>>>>> >>>>>>> Hello everybody, >>>>>>> I'm trying to use the JobListener to track when a job finishes (with >>>>>>> Flink 1.11.0). >>>>>>> It works great but I have the problem that logs inside >>>>>>> the onJobExecuted are not logged anywhere..is it normal? >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>> > |
I have a spring boot job server that act as a broker towards our application and a Flink session cluster. To submit a job I use the FlinkRestClient (that is also the one used in the CLI client when I use the run action it if I'm not wrong). However both methods don't trigger the job listener. Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto: @Flavio, when you're saying you're using the RestClusterClient, you are |
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none of these is still in the call chain with that setup then the listener will not be invoked. Also, this would only happen on the client, not on the broker (in your case) or the server (JobManager). Does that help to debug the problem? Aljoscha On 19.11.20 09:49, Flavio Pompermaier wrote: > I have a spring boot job server that act as a broker towards our > application and a Flink session cluster. To submit a job I use the > FlinkRestClient (that is also the one used in the CLI client when I use the > run action it if I'm not wrong). However both methods don't trigger the job > listener. > > Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto: > >> @Flavio, when you're saying you're using the RestClusterClient, you are >> not actually using that manually, right? You're just submitting your job >> via "bin/flink run ...", right? >> >> What's the exact invocation of "bin/flink run" that you're using? >> >> On 19.11.20 09:29, Andrey Zagrebin wrote: >>> Hi Flavio, >>> >>> I think I can reproduce what you are reporting (assuming you also pass >>> '--output' to 'flink run'). >>> I am not sure why it behaves like this. I would suggest filing a Jira >>> ticket for this. >>> >>> Best, >>> Andrey >>> >>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email] >>> >>> wrote: >>> >>>> is this a bug or is it a documentation problem...? >>>> >>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha >>>> scritto: >>>> >>>>> I've also verified that the problem persist also using a modified >> version >>>>> of the WordCount class. >>>>> If you add the code pasted at the end of this email at the end of its >>>>> main method you can verify that the listener is called if you run the >>>>> program from the IDE, but it's not called if you submit the job using >> the >>>>> CLI client using the command >>>>> >>>>> - bin/flink run >>>>> >> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >>>>> >>>>> Maybe this is an expected result but I didn't find any documentation of >>>>> this behaviour (neither in the Javadoc or in the flink web site, where >> I >>>>> can't find any documentation about JobListener at all). >>>>> >>>>> [Code to add to main()] >>>>> // emit result >>>>> if (params.has("output")) { >>>>> counts.writeAsCsv(params.get("output"), "\n", " "); >>>>> // execute program >>>>> env.registerJobListener(new JobListener() { >>>>> >>>>> @Override >>>>> public void onJobSubmitted(JobClient arg0, Throwable arg1) { >>>>> System.out.println("**************** SUBMITTED"); >>>>> } >>>>> >>>>> @Override >>>>> public void onJobExecuted(JobExecutionResult arg0, Throwable >>>>> arg1) { >>>>> System.out.println("**************** EXECUTED"); >>>>> } >>>>> }); >>>>> env.execute("WordCount Example"); >>>>> } else { >>>>> System.out.println("Printing result to stdout. Use --output to >>>>> specify output path."); >>>>> counts.print(); >>>>> } >>>>> >>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < >> [hidden email]> >>>>> wrote: >>>>> >>>>>> see inline >>>>>> >>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha >>>>>> scritto: >>>>>> >>>>>>> Hi Flavio, >>>>>>> thanks for sharing this with the Flink community. Could you answer >> the >>>>>>> following questions, please: >>>>>>> - What's the code of your Job's main method? >>>>>>> >>>>>> >>>>>> it's actually very simple...the main class creates a batch execution >> env >>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job >>>>>> listener to the env and I do some stuff before calling env.execute(). >>>>>> The listener is executed correctly but if I use the RestClusterClient >> to >>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the >>>>>> program is executed as usual but the job listener is not called. >>>>>> >>>>>> - What cluster backend and application do you use to execute the job? >>>>>>> >>>>>> >>>>>> I use a standalone session cluster for the moment >>>>>> >>>>>> - Is there anything suspicious you can find in the logs that might be >>>>>>> related? >>>>>>> >>>>>> >>>>>> no unfortunately.. >>>>>> >>>>>> >>>>>>> Best, >>>>>>> Matthias >>>>>>> >>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >>>>>>> [hidden email]> wrote: >>>>>>> >>>>>>>> Actually what I'm experiencing is that the JobListener is executed >>>>>>>> successfully if I run my main class from the IDE, while the job >> listener is >>>>>>>> not fired at all if I submit the JobGraph of the application to a >> cluster >>>>>>>> using the RestClusterClient.. >>>>>>>> Am I doing something wrong? >>>>>>>> >>>>>>>> My main class ends with the env.execute() and i do >>>>>>>> env.registerJobListener() when I create the Exceution env >>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >>>>>>>> >>>>>>>> Thanks in advance for any help, >>>>>>>> Flavio >>>>>>>> >>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >>>>>>>> [hidden email]> wrote: >>>>>>>> >>>>>>>>> Hello everybody, >>>>>>>>> I'm trying to use the JobListener to track when a job finishes >> (with >>>>>>>>> Flink 1.11.0). >>>>>>>>> It works great but I have the problem that logs inside >>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Flavio >>>>>>>>> >>>>>>>> >>> >> >> > |
Hi Flavio and Aljoscha, Sorry for the late heads up. I could not actually reproduce the reported problem with 'flink run' and local standalone cluster on master. I get the expected output with the suggested modification of WordCount program: $ bin/start-cluster.sh $ rm -rf out; bin/flink run flink/flink-examples/flink-examples-batch/target/WordCount.jar --output flink/build-target/out Executing WordCount example with default input data set. Use --input to specify file input. **************** SUBMITTED Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 Program execution finished Job with JobID c454a894d0524ccb69943b95838eea07 has finished. Job Runtime: 139 ms **************** EXECUTED Best, Andrey On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> wrote: JobListener.onJobExecuted() is only invoked in |
In reply to this post by Aljoscha Krettek
Hi Aljoscha, in my main class, within the jar, I create the env and I call env.execute(). The listener is not called if the job is ran by the CLI client or FlinkRestClient, I don't see anything on the job manager or task manager. To me this is a bug and you can verify it attaching a listener to the WordCount example and launching the job using the CLI client. If this is the expected behaviour it is not reported anywhere Il gio 19 nov 2020, 12:40 Aljoscha Krettek <[hidden email]> ha scritto: JobListener.onJobExecuted() is only invoked in |
In reply to this post by Andrey Zagrebin-5
Which version are you using? I used the exact same commands on Flink 1.11.0 and I didn't get the job listener output.. Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
|
Hmm, there was this issue:
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed in your version. On 19.11.20 12:58, Flavio Pompermaier wrote: > Which version are you using? > I used the exact same commands on Flink 1.11.0 and I didn't get the job > listener output.. > > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto: > >> Hi Flavio and Aljoscha, >> >> Sorry for the late heads up. I could not actually reproduce the reported >> problem with 'flink run' and local standalone cluster on master. >> I get the expected output with the suggested modification of WordCount >> program: >> >> $ bin/start-cluster.sh >> >> $ rm -rf out; bin/flink run >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output >> flink/build-target/out >> >> Executing WordCount example with default input data set. >> Use --input to specify file input. >> **************** SUBMITTED >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 >> Program execution finished >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. >> Job Runtime: 139 ms >> >> **************** EXECUTED >> >> Best, >> Andrey >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> >> wrote: >> >>> JobListener.onJobExecuted() is only invoked in >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none >>> of these is still in the call chain with that setup then the listener >>> will not be invoked. >>> >>> Also, this would only happen on the client, not on the broker (in your >>> case) or the server (JobManager). >>> >>> Does that help to debug the problem? >>> >>> Aljoscha >>> >>> On 19.11.20 09:49, Flavio Pompermaier wrote: >>>> I have a spring boot job server that act as a broker towards our >>>> application and a Flink session cluster. To submit a job I use the >>>> FlinkRestClient (that is also the one used in the CLI client when I use >>> the >>>> run action it if I'm not wrong). However both methods don't trigger the >>> job >>>> listener. >>>> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha >>> scritto: >>>> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are >>>>> not actually using that manually, right? You're just submitting your >>> job >>>>> via "bin/flink run ...", right? >>>>> >>>>> What's the exact invocation of "bin/flink run" that you're using? >>>>> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: >>>>>> Hi Flavio, >>>>>> >>>>>> I think I can reproduce what you are reporting (assuming you also pass >>>>>> '--output' to 'flink run'). >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira >>>>>> ticket for this. >>>>>> >>>>>> Best, >>>>>> Andrey >>>>>> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < >>> [hidden email] >>>>>> >>>>>> wrote: >>>>>> >>>>>>> is this a bug or is it a documentation problem...? >>>>>>> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> >>> ha >>>>>>> scritto: >>>>>>> >>>>>>>> I've also verified that the problem persist also using a modified >>>>> version >>>>>>>> of the WordCount class. >>>>>>>> If you add the code pasted at the end of this email at the end of >>> its >>>>>>>> main method you can verify that the listener is called if you run >>> the >>>>>>>> program from the IDE, but it's not called if you submit the job >>> using >>>>> the >>>>>>>> CLI client using the command >>>>>>>> >>>>>>>> - bin/flink run >>>>>>>> >>>>> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >>>>>>>> >>>>>>>> Maybe this is an expected result but I didn't find any >>> documentation of >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, >>> where >>>>> I >>>>>>>> can't find any documentation about JobListener at all). >>>>>>>> >>>>>>>> [Code to add to main()] >>>>>>>> // emit result >>>>>>>> if (params.has("output")) { >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); >>>>>>>> // execute program >>>>>>>> env.registerJobListener(new JobListener() { >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable >>> arg1) { >>>>>>>> System.out.println("**************** SUBMITTED"); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, >>> Throwable >>>>>>>> arg1) { >>>>>>>> System.out.println("**************** EXECUTED"); >>>>>>>> } >>>>>>>> }); >>>>>>>> env.execute("WordCount Example"); >>>>>>>> } else { >>>>>>>> System.out.println("Printing result to stdout. Use --output >>> to >>>>>>>> specify output path."); >>>>>>>> counts.print(); >>>>>>>> } >>>>>>>> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < >>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> see inline >>>>>>>>> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> >>> ha >>>>>>>>> scritto: >>>>>>>>> >>>>>>>>>> Hi Flavio, >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer >>>>> the >>>>>>>>>> following questions, please: >>>>>>>>>> - What's the code of your Job's main method? >>>>>>>>>> >>>>>>>>> >>>>>>>>> it's actually very simple...the main class creates a batch >>> execution >>>>> env >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a >>> job >>>>>>>>> listener to the env and I do some stuff before calling >>> env.execute(). >>>>>>>>> The listener is executed correctly but if I use the >>> RestClusterClient >>>>> to >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, >>> the >>>>>>>>> program is executed as usual but the job listener is not called. >>>>>>>>> >>>>>>>>> - What cluster backend and application do you use to execute the >>> job? >>>>>>>>>> >>>>>>>>> >>>>>>>>> I use a standalone session cluster for the moment >>>>>>>>> >>>>>>>>> - Is there anything suspicious you can find in the logs that might >>> be >>>>>>>>>> related? >>>>>>>>>> >>>>>>>>> >>>>>>>>> no unfortunately.. >>>>>>>>> >>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Matthias >>>>>>>>>> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >>>>>>>>>> [hidden email]> wrote: >>>>>>>>>> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is >>> executed >>>>>>>>>>> successfully if I run my main class from the IDE, while the job >>>>> listener is >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a >>>>> cluster >>>>>>>>>>> using the RestClusterClient.. >>>>>>>>>>> Am I doing something wrong? >>>>>>>>>>> >>>>>>>>>>> My main class ends with the env.execute() and i do >>>>>>>>>>> env.registerJobListener() when I create the Exceution env >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >>>>>>>>>>> >>>>>>>>>>> Thanks in advance for any help, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >>>>>>>>>>> [hidden email]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello everybody, >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes >>>>> (with >>>>>>>>>>>> Flink 1.11.0). >>>>>>>>>>>> It works great but I have the problem that logs inside >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Flavio >>>>>>>>>>>> >>>>>>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> > |
I also tried 1.11.0 and 1.11.2, both work for me. On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote: Hmm, there was this issue: |
You're right..I removed my flink dir and I re-extracted it and now it
works. Unfortunately I didn't keep the old version to understand what were the difference but the error was probably caused by the fact that I had a previous version of the WordCount.jar (without the listener) in the flink lib dir.. (in another dev session I was experimenting in running the job having the user jar in the lib dir). Sorry for the confusion. Just one last question: is the listener executed on the client or on the job server? This is not entirely clear to me.. Best, Flavio On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <[hidden email]> wrote: > > I also tried 1.11.0 and 1.11.2, both work for me. > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote: >> >> Hmm, there was this issue: >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed >> in your version. >> >> On 19.11.20 12:58, Flavio Pompermaier wrote: >> > Which version are you using? >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job >> > listener output.. >> > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto: >> > >> >> Hi Flavio and Aljoscha, >> >> >> >> Sorry for the late heads up. I could not actually reproduce the reported >> >> problem with 'flink run' and local standalone cluster on master. >> >> I get the expected output with the suggested modification of WordCount >> >> program: >> >> >> >> $ bin/start-cluster.sh >> >> >> >> $ rm -rf out; bin/flink run >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output >> >> flink/build-target/out >> >> >> >> Executing WordCount example with default input data set. >> >> Use --input to specify file input. >> >> **************** SUBMITTED >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 >> >> Program execution finished >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. >> >> Job Runtime: 139 ms >> >> >> >> **************** EXECUTED >> >> >> >> Best, >> >> Andrey >> >> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> >> >> wrote: >> >> >> >>> JobListener.onJobExecuted() is only invoked in >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none >> >>> of these is still in the call chain with that setup then the listener >> >>> will not be invoked. >> >>> >> >>> Also, this would only happen on the client, not on the broker (in your >> >>> case) or the server (JobManager). >> >>> >> >>> Does that help to debug the problem? >> >>> >> >>> Aljoscha >> >>> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote: >> >>>> I have a spring boot job server that act as a broker towards our >> >>>> application and a Flink session cluster. To submit a job I use the >> >>>> FlinkRestClient (that is also the one used in the CLI client when I use >> >>> the >> >>>> run action it if I'm not wrong). However both methods don't trigger the >> >>> job >> >>>> listener. >> >>>> >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha >> >>> scritto: >> >>>> >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are >> >>>>> not actually using that manually, right? You're just submitting your >> >>> job >> >>>>> via "bin/flink run ...", right? >> >>>>> >> >>>>> What's the exact invocation of "bin/flink run" that you're using? >> >>>>> >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: >> >>>>>> Hi Flavio, >> >>>>>> >> >>>>>> I think I can reproduce what you are reporting (assuming you also pass >> >>>>>> '--output' to 'flink run'). >> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira >> >>>>>> ticket for this. >> >>>>>> >> >>>>>> Best, >> >>>>>> Andrey >> >>>>>> >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < >> >>> [hidden email] >> >>>>>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> is this a bug or is it a documentation problem...? >> >>>>>>> >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> >> >>> ha >> >>>>>>> scritto: >> >>>>>>> >> >>>>>>>> I've also verified that the problem persist also using a modified >> >>>>> version >> >>>>>>>> of the WordCount class. >> >>>>>>>> If you add the code pasted at the end of this email at the end of >> >>> its >> >>>>>>>> main method you can verify that the listener is called if you run >> >>> the >> >>>>>>>> program from the IDE, but it's not called if you submit the job >> >>> using >> >>>>> the >> >>>>>>>> CLI client using the command >> >>>>>>>> >> >>>>>>>> - bin/flink run >> >>>>>>>> >> >>>>> >> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar >> >>>>>>>> >> >>>>>>>> Maybe this is an expected result but I didn't find any >> >>> documentation of >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, >> >>> where >> >>>>> I >> >>>>>>>> can't find any documentation about JobListener at all). >> >>>>>>>> >> >>>>>>>> [Code to add to main()] >> >>>>>>>> // emit result >> >>>>>>>> if (params.has("output")) { >> >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); >> >>>>>>>> // execute program >> >>>>>>>> env.registerJobListener(new JobListener() { >> >>>>>>>> >> >>>>>>>> @Override >> >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable >> >>> arg1) { >> >>>>>>>> System.out.println("**************** SUBMITTED"); >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> @Override >> >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, >> >>> Throwable >> >>>>>>>> arg1) { >> >>>>>>>> System.out.println("**************** EXECUTED"); >> >>>>>>>> } >> >>>>>>>> }); >> >>>>>>>> env.execute("WordCount Example"); >> >>>>>>>> } else { >> >>>>>>>> System.out.println("Printing result to stdout. Use --output >> >>> to >> >>>>>>>> specify output path."); >> >>>>>>>> counts.print(); >> >>>>>>>> } >> >>>>>>>> >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < >> >>>>> [hidden email]> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> see inline >> >>>>>>>>> >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> >> >>> ha >> >>>>>>>>> scritto: >> >>>>>>>>> >> >>>>>>>>>> Hi Flavio, >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer >> >>>>> the >> >>>>>>>>>> following questions, please: >> >>>>>>>>>> - What's the code of your Job's main method? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> it's actually very simple...the main class creates a batch >> >>> execution >> >>>>> env >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a >> >>> job >> >>>>>>>>> listener to the env and I do some stuff before calling >> >>> env.execute(). >> >>>>>>>>> The listener is executed correctly but if I use the >> >>> RestClusterClient >> >>>>> to >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, >> >>> the >> >>>>>>>>> program is executed as usual but the job listener is not called. >> >>>>>>>>> >> >>>>>>>>> - What cluster backend and application do you use to execute the >> >>> job? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> I use a standalone session cluster for the moment >> >>>>>>>>> >> >>>>>>>>> - Is there anything suspicious you can find in the logs that might >> >>> be >> >>>>>>>>>> related? >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> no unfortunately.. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> Matthias >> >>>>>>>>>> >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < >> >>>>>>>>>> [hidden email]> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is >> >>> executed >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job >> >>>>> listener is >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a >> >>>>> cluster >> >>>>>>>>>>> using the RestClusterClient.. >> >>>>>>>>>>> Am I doing something wrong? >> >>>>>>>>>>> >> >>>>>>>>>>> My main class ends with the env.execute() and i do >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks in advance for any help, >> >>>>>>>>>>> Flavio >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < >> >>>>>>>>>>> [hidden email]> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hello everybody, >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes >> >>>>> (with >> >>>>>>>>>>>> Flink 1.11.0). >> >>>>>>>>>>>> It works great but I have the problem that logs inside >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best, >> >>>>>>>>>>>> Flavio >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>> >> >>>>> >> >>>>> >> >>>> >> >>> >> >>> >> > >> |
I think that the problem is that my REST service submits the job to
the Flink standalone cluster and responds to the client with the submitted job ID. To achieve this, I was using the RestClusterClient<StandaloneClusterId> because with that I can use the following code and retrieve the JobID: (1) JobID flinkJobId = client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID(); Unfortunately this does not activate the job listener (that is quite surprising to me...I thought that such a listener was triggered by the JobManager). So, after Aljoscha answer I take a deeper look into the Flink CLI code and what it does is basically this: (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), flinkConf, packagedProgram, false, false); That works as expected (I wasn't aware of the ThreadLocal mechanism used by the ContextEnvironment and StreamContextEnvironment: a very advanced programming technique) but it does not allow to get back the job id that I need..I can live with that because I can save the Flink Job ID in an external service when the job listener triggers the onJobSubmitted method but I think this mechanism is quite weird.. So my question is: is there a simple way to achieve my goal? Am I doing something wrong? At the moment I had to implement a job-status polling thread after the line (1) but this looks like a workaround to me.. Best, Flavio On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <[hidden email]> wrote: > > You're right..I removed my flink dir and I re-extracted it and now it > works. Unfortunately I didn't keep the old version to understand what > were the difference but the error was probably caused by the fact that > I had a previous version of the WordCount.jar (without the listener) > in the flink lib dir.. (in another dev session I was experimenting in > running the job having the user jar in the lib dir). Sorry for the > confusion. > Just one last question: is the listener executed on the client or on > the job server? This is not entirely clear to me.. > > Best, > Flavio > > On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <[hidden email]> wrote: > > > > I also tried 1.11.0 and 1.11.2, both work for me. > > > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote: > >> > >> Hmm, there was this issue: > >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed > >> in your version. > >> > >> On 19.11.20 12:58, Flavio Pompermaier wrote: > >> > Which version are you using? > >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job > >> > listener output.. > >> > > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto: > >> > > >> >> Hi Flavio and Aljoscha, > >> >> > >> >> Sorry for the late heads up. I could not actually reproduce the reported > >> >> problem with 'flink run' and local standalone cluster on master. > >> >> I get the expected output with the suggested modification of WordCount > >> >> program: > >> >> > >> >> $ bin/start-cluster.sh > >> >> > >> >> $ rm -rf out; bin/flink run > >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output > >> >> flink/build-target/out > >> >> > >> >> Executing WordCount example with default input data set. > >> >> Use --input to specify file input. > >> >> **************** SUBMITTED > >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07 > >> >> Program execution finished > >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished. > >> >> Job Runtime: 139 ms > >> >> > >> >> **************** EXECUTED > >> >> > >> >> Best, > >> >> Andrey > >> >> > >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> > >> >> wrote: > >> >> > >> >>> JobListener.onJobExecuted() is only invoked in > >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none > >> >>> of these is still in the call chain with that setup then the listener > >> >>> will not be invoked. > >> >>> > >> >>> Also, this would only happen on the client, not on the broker (in your > >> >>> case) or the server (JobManager). > >> >>> > >> >>> Does that help to debug the problem? > >> >>> > >> >>> Aljoscha > >> >>> > >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote: > >> >>>> I have a spring boot job server that act as a broker towards our > >> >>>> application and a Flink session cluster. To submit a job I use the > >> >>>> FlinkRestClient (that is also the one used in the CLI client when I use > >> >>> the > >> >>>> run action it if I'm not wrong). However both methods don't trigger the > >> >>> job > >> >>>> listener. > >> >>>> > >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha > >> >>> scritto: > >> >>>> > >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are > >> >>>>> not actually using that manually, right? You're just submitting your > >> >>> job > >> >>>>> via "bin/flink run ...", right? > >> >>>>> > >> >>>>> What's the exact invocation of "bin/flink run" that you're using? > >> >>>>> > >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote: > >> >>>>>> Hi Flavio, > >> >>>>>> > >> >>>>>> I think I can reproduce what you are reporting (assuming you also pass > >> >>>>>> '--output' to 'flink run'). > >> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira > >> >>>>>> ticket for this. > >> >>>>>> > >> >>>>>> Best, > >> >>>>>> Andrey > >> >>>>>> > >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier < > >> >>> [hidden email] > >> >>>>>> > >> >>>>>> wrote: > >> >>>>>> > >> >>>>>>> is this a bug or is it a documentation problem...? > >> >>>>>>> > >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> > >> >>> ha > >> >>>>>>> scritto: > >> >>>>>>> > >> >>>>>>>> I've also verified that the problem persist also using a modified > >> >>>>> version > >> >>>>>>>> of the WordCount class. > >> >>>>>>>> If you add the code pasted at the end of this email at the end of > >> >>> its > >> >>>>>>>> main method you can verify that the listener is called if you run > >> >>> the > >> >>>>>>>> program from the IDE, but it's not called if you submit the job > >> >>> using > >> >>>>> the > >> >>>>>>>> CLI client using the command > >> >>>>>>>> > >> >>>>>>>> - bin/flink run > >> >>>>>>>> > >> >>>>> > >> >>> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar > >> >>>>>>>> > >> >>>>>>>> Maybe this is an expected result but I didn't find any > >> >>> documentation of > >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site, > >> >>> where > >> >>>>> I > >> >>>>>>>> can't find any documentation about JobListener at all). > >> >>>>>>>> > >> >>>>>>>> [Code to add to main()] > >> >>>>>>>> // emit result > >> >>>>>>>> if (params.has("output")) { > >> >>>>>>>> counts.writeAsCsv(params.get("output"), "\n", " "); > >> >>>>>>>> // execute program > >> >>>>>>>> env.registerJobListener(new JobListener() { > >> >>>>>>>> > >> >>>>>>>> @Override > >> >>>>>>>> public void onJobSubmitted(JobClient arg0, Throwable > >> >>> arg1) { > >> >>>>>>>> System.out.println("**************** SUBMITTED"); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> @Override > >> >>>>>>>> public void onJobExecuted(JobExecutionResult arg0, > >> >>> Throwable > >> >>>>>>>> arg1) { > >> >>>>>>>> System.out.println("**************** EXECUTED"); > >> >>>>>>>> } > >> >>>>>>>> }); > >> >>>>>>>> env.execute("WordCount Example"); > >> >>>>>>>> } else { > >> >>>>>>>> System.out.println("Printing result to stdout. Use --output > >> >>> to > >> >>>>>>>> specify output path."); > >> >>>>>>>> counts.print(); > >> >>>>>>>> } > >> >>>>>>>> > >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier < > >> >>>>> [hidden email]> > >> >>>>>>>> wrote: > >> >>>>>>>> > >> >>>>>>>>> see inline > >> >>>>>>>>> > >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> > >> >>> ha > >> >>>>>>>>> scritto: > >> >>>>>>>>> > >> >>>>>>>>>> Hi Flavio, > >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer > >> >>>>> the > >> >>>>>>>>>> following questions, please: > >> >>>>>>>>>> - What's the code of your Job's main method? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> it's actually very simple...the main class creates a batch > >> >>> execution > >> >>>>> env > >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a > >> >>> job > >> >>>>>>>>> listener to the env and I do some stuff before calling > >> >>> env.execute(). > >> >>>>>>>>> The listener is executed correctly but if I use the > >> >>> RestClusterClient > >> >>>>> to > >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, > >> >>> the > >> >>>>>>>>> program is executed as usual but the job listener is not called. > >> >>>>>>>>> > >> >>>>>>>>> - What cluster backend and application do you use to execute the > >> >>> job? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> I use a standalone session cluster for the moment > >> >>>>>>>>> > >> >>>>>>>>> - Is there anything suspicious you can find in the logs that might > >> >>> be > >> >>>>>>>>>> related? > >> >>>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> no unfortunately.. > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>>> Best, > >> >>>>>>>>>> Matthias > >> >>>>>>>>>> > >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier < > >> >>>>>>>>>> [hidden email]> wrote: > >> >>>>>>>>>> > >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is > >> >>> executed > >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job > >> >>>>> listener is > >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a > >> >>>>> cluster > >> >>>>>>>>>>> using the RestClusterClient.. > >> >>>>>>>>>>> Am I doing something wrong? > >> >>>>>>>>>>> > >> >>>>>>>>>>> My main class ends with the env.execute() and i do > >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env > >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment(). > >> >>>>>>>>>>> > >> >>>>>>>>>>> Thanks in advance for any help, > >> >>>>>>>>>>> Flavio > >> >>>>>>>>>>> > >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier < > >> >>>>>>>>>>> [hidden email]> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>>> Hello everybody, > >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes > >> >>>>> (with > >> >>>>>>>>>>>> Flink 1.11.0). > >> >>>>>>>>>>>> It works great but I have the problem that logs inside > >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal? > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Best, > >> >>>>>>>>>>>> Flavio > >> >>>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>> > >> >>>>> > >> >>>>> > >> >>>> > >> >>> > >> >>> > >> > > >> |
My final analysis is that the RestClusterClient lack of many methods (jarUpload, jarRun, getExceptions for example) and that the submitJob (and the JobSubmitHandler endpoint) is bugged or should be deprecated (because it does not call the job listeners). Indeed, if the JarRunHandler endpoint is invoked (e.b. from the Web UI) the job listeners are invoked correctly. In order to do what I want I see the following options (correct me if I'm wrong):
Thanks in advance for any support, Flavio On Fri, Nov 20, 2020 at 10:09 PM Flavio Pompermaier <[hidden email]> wrote: I think that the problem is that my REST service submits the job to |
In reply to this post by Flavio Pompermaier
On 20.11.20 22:09, Flavio Pompermaier wrote:
> To achieve this, I was using the > RestClusterClient<StandaloneClusterId> because with that I can use the > following code and retrieve the JobID: > > (1) JobID flinkJobId = > client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID(); All you want to do is get the JobID, correct? If yes, you can just do a `jobGraph.getJobID()`. The job id is not set on the cluster but it's actually set client side, on the JobGraph object. Does that help in your case? A general comment on your other questions: yes, the listener logic if only used when using the environments. It's not integrated with the RestClusterClient, which is considered more of an internal implementation detail. Aljoscha |
Thank you Aljosha,.now that's more clear! I didn't know that jobGraph.getJobID() was the solution for my use case..I was convinced that the job ID was assigned by the cluster! And to me it's really weird that the job listener was not called by the submitJob...Probably this should be documented at least. In the meanwhile I extended a little bit the RestClusterClient..do you think it could be worth issuing a PR to add some unimplemented methods? For example I added: - public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId); - public EmptyResponseBody deleteJar(String jarFileName); - public boolean isJobRunning(JobID fjid) - public JarUploadResponseBody uploadJar(Path uploadedFile); and I was also going to add jarRun.. Let me know, Flavio On Mon, Nov 23, 2020 at 3:57 PM Aljoscha Krettek <[hidden email]> wrote: On 20.11.20 22:09, Flavio Pompermaier wrote: Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 |
Free forum by Nabble | Edit this page |