Logs of JobExecutionListener

classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Logs of JobExecutionListener

Flavio Pompermaier
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Matthias
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the following questions, please:
- What's the code of your Job's main method?
- What cluster backend and application do you use to execute the job?
- Is there anything suspicious you can find in the logs that might be related?

Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha scritto:
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the following questions, please:
- What's the code of your Job's main method?

it's actually very simple...the main class creates a batch execution env using ExecutionEnvironment.getExecutionEnvironment(), I register a job listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to sibmit the jobGraph exyracted from that main contained in a jar, the program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?

I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be related?

no unfortunately..


Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
I've also verified that the problem persist also using a modified version of the WordCount class.
If you add the code pasted at the end of this email at the end of its main method you can verify that the listener is called if you run the program from the IDE, but it's not called if you submit the job using the CLI client using the command
  • bin/flink run /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
Maybe this is an expected result but I didn't find any documentation of this behaviour (neither in the Javadoc or in the flink web site, where I can't find any documentation about JobListener at all).

[Code to add to main()]
    // emit result
    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "\n", " ");
      // execute program
      env.registerJobListener(new JobListener() {

        @Override
        public void onJobSubmitted(JobClient arg0, Throwable arg1) {
          System.out.println("**************** SUBMITTED");
        }

        @Override
        public void onJobExecuted(JobExecutionResult arg0, Throwable arg1) {
          System.out.println("**************** EXECUTED");
        }
      });
      env.execute("WordCount Example");
    } else {
      System.out.println("Printing result to stdout. Use --output to specify output path.");
      counts.print();
    }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]> wrote:
see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha scritto:
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the following questions, please:
- What's the code of your Job's main method?

it's actually very simple...the main class creates a batch execution env using ExecutionEnvironment.getExecutionEnvironment(), I register a job listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to sibmit the jobGraph exyracted from that main contained in a jar, the program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?

I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be related?

no unfortunately..


Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha scritto:
I've also verified that the problem persist also using a modified version of the WordCount class.
If you add the code pasted at the end of this email at the end of its main method you can verify that the listener is called if you run the program from the IDE, but it's not called if you submit the job using the CLI client using the command
  • bin/flink run /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
Maybe this is an expected result but I didn't find any documentation of this behaviour (neither in the Javadoc or in the flink web site, where I can't find any documentation about JobListener at all).

[Code to add to main()]
    // emit result
    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "\n", " ");
      // execute program
      env.registerJobListener(new JobListener() {

        @Override
        public void onJobSubmitted(JobClient arg0, Throwable arg1) {
          System.out.println("**************** SUBMITTED");
        }

        @Override
        public void onJobExecuted(JobExecutionResult arg0, Throwable arg1) {
          System.out.println("**************** EXECUTED");
        }
      });
      env.execute("WordCount Example");
    } else {
      System.out.println("Printing result to stdout. Use --output to specify output path.");
      counts.print();
    }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]> wrote:
see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha scritto:
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the following questions, please:
- What's the code of your Job's main method?

it's actually very simple...the main class creates a batch execution env using ExecutionEnvironment.getExecutionEnvironment(), I register a job listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to sibmit the jobGraph exyracted from that main contained in a jar, the program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?

I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be related?

no unfortunately..


Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Andrey Zagrebin-5
Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass '--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]> wrote:
is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha scritto:
I've also verified that the problem persist also using a modified version of the WordCount class.
If you add the code pasted at the end of this email at the end of its main method you can verify that the listener is called if you run the program from the IDE, but it's not called if you submit the job using the CLI client using the command
  • bin/flink run /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
Maybe this is an expected result but I didn't find any documentation of this behaviour (neither in the Javadoc or in the flink web site, where I can't find any documentation about JobListener at all).

[Code to add to main()]
    // emit result
    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "\n", " ");
      // execute program
      env.registerJobListener(new JobListener() {

        @Override
        public void onJobSubmitted(JobClient arg0, Throwable arg1) {
          System.out.println("**************** SUBMITTED");
        }

        @Override
        public void onJobExecuted(JobExecutionResult arg0, Throwable arg1) {
          System.out.println("**************** EXECUTED");
        }
      });
      env.execute("WordCount Example");
    } else {
      System.out.println("Printing result to stdout. Use --output to specify output path.");
      counts.print();
    }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]> wrote:
see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha scritto:
Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the following questions, please:
- What's the code of your Job's main method?

it's actually very simple...the main class creates a batch execution env using ExecutionEnvironment.getExecutionEnvironment(), I register a job listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to sibmit the jobGraph exyracted from that main contained in a jar, the program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?

I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be related?

no unfortunately..


Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <[hidden email]> wrote:
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do env.registerJobListener() when I create the Exceution env via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <[hidden email]> wrote:
Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). 
It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Aljoscha Krettek
@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

> Hi Flavio,
>
> I think I can reproduce what you are reporting (assuming you also pass
> '--output' to 'flink run').
> I am not sure why it behaves like this. I would suggest filing a Jira
> ticket for this.
>
> Best,
> Andrey
>
> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]>
> wrote:
>
>> is this a bug or is it a documentation problem...?
>>
>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>> scritto:
>>
>>> I've also verified that the problem persist also using a modified version
>>> of the WordCount class.
>>> If you add the code pasted at the end of this email at the end of its
>>> main method you can verify that the listener is called if you run the
>>> program from the IDE, but it's not called if you submit the job using the
>>> CLI client using the command
>>>
>>>     - bin/flink run
>>>     /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>
>>> Maybe this is an expected result but I didn't find any documentation of
>>> this behaviour (neither in the Javadoc or in the flink web site, where I
>>> can't find any documentation about JobListener at all).
>>>
>>> [Code to add to main()]
>>>      // emit result
>>>      if (params.has("output")) {
>>>        counts.writeAsCsv(params.get("output"), "\n", " ");
>>>        // execute program
>>>        env.registerJobListener(new JobListener() {
>>>
>>>          @Override
>>>          public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>            System.out.println("**************** SUBMITTED");
>>>          }
>>>
>>>          @Override
>>>          public void onJobExecuted(JobExecutionResult arg0, Throwable
>>> arg1) {
>>>            System.out.println("**************** EXECUTED");
>>>          }
>>>        });
>>>        env.execute("WordCount Example");
>>>      } else {
>>>        System.out.println("Printing result to stdout. Use --output to
>>> specify output path.");
>>>        counts.print();
>>>      }
>>>
>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>
>>>> see inline
>>>>
>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> Hi Flavio,
>>>>> thanks for sharing this with the Flink community. Could you answer the
>>>>> following questions, please:
>>>>> - What's the code of your Job's main method?
>>>>>
>>>>
>>>> it's actually very simple...the main class creates a batch execution env
>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>> listener to the env and I do some stuff before calling env.execute().
>>>> The listener is executed correctly but if I use the RestClusterClient to
>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>> program is executed as usual but the job listener is not called.
>>>>
>>>> - What cluster backend and application do you use to execute the job?
>>>>>
>>>>
>>>> I use a standalone session cluster for the moment
>>>>
>>>> - Is there anything suspicious you can find in the logs that might be
>>>>> related?
>>>>>
>>>>
>>>> no unfortunately..
>>>>
>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>> successfully if I run my main class from the IDE, while the job listener is
>>>>>> not fired at all if I submit the JobGraph of the application to a cluster
>>>>>> using the RestClusterClient..
>>>>>> Am I doing something wrong?
>>>>>>
>>>>>> My main class ends with the env.execute() and i do
>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>
>>>>>> Thanks in advance for any help,
>>>>>> Flavio
>>>>>>
>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> Hello everybody,
>>>>>>> I'm trying to use the JobListener to track when a job finishes (with
>>>>>>> Flink 1.11.0).
>>>>>>> It works great but I have the problem that logs inside
>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
I have a spring boot job server that act as a broker towards our application and a Flink session cluster. To submit a job I use the FlinkRestClient (that is also the one used in the CLI client when I use the run action it if I'm not wrong). However both methods don't trigger the job listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto:
@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:
> Hi Flavio,
>
> I think I can reproduce what you are reporting (assuming you also pass
> '--output' to 'flink run').
> I am not sure why it behaves like this. I would suggest filing a Jira
> ticket for this.
>
> Best,
> Andrey
>
> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]>
> wrote:
>
>> is this a bug or is it a documentation problem...?
>>
>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>> scritto:
>>
>>> I've also verified that the problem persist also using a modified version
>>> of the WordCount class.
>>> If you add the code pasted at the end of this email at the end of its
>>> main method you can verify that the listener is called if you run the
>>> program from the IDE, but it's not called if you submit the job using the
>>> CLI client using the command
>>>
>>>     - bin/flink run
>>>     /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>
>>> Maybe this is an expected result but I didn't find any documentation of
>>> this behaviour (neither in the Javadoc or in the flink web site, where I
>>> can't find any documentation about JobListener at all).
>>>
>>> [Code to add to main()]
>>>      // emit result
>>>      if (params.has("output")) {
>>>        counts.writeAsCsv(params.get("output"), "\n", " ");
>>>        // execute program
>>>        env.registerJobListener(new JobListener() {
>>>
>>>          @Override
>>>          public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>            System.out.println("**************** SUBMITTED");
>>>          }
>>>
>>>          @Override
>>>          public void onJobExecuted(JobExecutionResult arg0, Throwable
>>> arg1) {
>>>            System.out.println("**************** EXECUTED");
>>>          }
>>>        });
>>>        env.execute("WordCount Example");
>>>      } else {
>>>        System.out.println("Printing result to stdout. Use --output to
>>> specify output path.");
>>>        counts.print();
>>>      }
>>>
>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>
>>>> see inline
>>>>
>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> Hi Flavio,
>>>>> thanks for sharing this with the Flink community. Could you answer the
>>>>> following questions, please:
>>>>> - What's the code of your Job's main method?
>>>>>
>>>>
>>>> it's actually very simple...the main class creates a batch execution env
>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>> listener to the env and I do some stuff before calling env.execute().
>>>> The listener is executed correctly but if I use the RestClusterClient to
>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>> program is executed as usual but the job listener is not called.
>>>>
>>>> - What cluster backend and application do you use to execute the job?
>>>>>
>>>>
>>>> I use a standalone session cluster for the moment
>>>>
>>>> - Is there anything suspicious you can find in the logs that might be
>>>>> related?
>>>>>
>>>>
>>>> no unfortunately..
>>>>
>>>>
>>>>> Best,
>>>>> Matthias
>>>>>
>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>> successfully if I run my main class from the IDE, while the job listener is
>>>>>> not fired at all if I submit the JobGraph of the application to a cluster
>>>>>> using the RestClusterClient..
>>>>>> Am I doing something wrong?
>>>>>>
>>>>>> My main class ends with the env.execute() and i do
>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>
>>>>>> Thanks in advance for any help,
>>>>>> Flavio
>>>>>>
>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> Hello everybody,
>>>>>>> I'm trying to use the JobListener to track when a job finishes (with
>>>>>>> Flink 1.11.0).
>>>>>>> It works great but I have the problem that logs inside
>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Aljoscha Krettek
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:

> I have a spring boot job server that act as a broker towards our
> application and a Flink session cluster. To submit a job I use the
> FlinkRestClient (that is also the one used in the CLI client when I use the
> run action it if I'm not wrong). However both methods don't trigger the job
> listener.
>
> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto:
>
>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> not actually using that manually, right? You're just submitting your job
>> via "bin/flink run ...", right?
>>
>> What's the exact invocation of "bin/flink run" that you're using?
>>
>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>> Hi Flavio,
>>>
>>> I think I can reproduce what you are reporting (assuming you also pass
>>> '--output' to 'flink run').
>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>> ticket for this.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]
>>>
>>> wrote:
>>>
>>>> is this a bug or is it a documentation problem...?
>>>>
>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> I've also verified that the problem persist also using a modified
>> version
>>>>> of the WordCount class.
>>>>> If you add the code pasted at the end of this email at the end of its
>>>>> main method you can verify that the listener is called if you run the
>>>>> program from the IDE, but it's not called if you submit the job using
>> the
>>>>> CLI client using the command
>>>>>
>>>>>      - bin/flink run
>>>>>
>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>
>>>>> Maybe this is an expected result but I didn't find any documentation of
>>>>> this behaviour (neither in the Javadoc or in the flink web site, where
>> I
>>>>> can't find any documentation about JobListener at all).
>>>>>
>>>>> [Code to add to main()]
>>>>>       // emit result
>>>>>       if (params.has("output")) {
>>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>         // execute program
>>>>>         env.registerJobListener(new JobListener() {
>>>>>
>>>>>           @Override
>>>>>           public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>>>             System.out.println("**************** SUBMITTED");
>>>>>           }
>>>>>
>>>>>           @Override
>>>>>           public void onJobExecuted(JobExecutionResult arg0, Throwable
>>>>> arg1) {
>>>>>             System.out.println("**************** EXECUTED");
>>>>>           }
>>>>>         });
>>>>>         env.execute("WordCount Example");
>>>>>       } else {
>>>>>         System.out.println("Printing result to stdout. Use --output to
>>>>> specify output path.");
>>>>>         counts.print();
>>>>>       }
>>>>>
>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> see inline
>>>>>>
>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>>>> scritto:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> the
>>>>>>> following questions, please:
>>>>>>> - What's the code of your Job's main method?
>>>>>>>
>>>>>>
>>>>>> it's actually very simple...the main class creates a batch execution
>> env
>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>>>> listener to the env and I do some stuff before calling env.execute().
>>>>>> The listener is executed correctly but if I use the RestClusterClient
>> to
>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>>>> program is executed as usual but the job listener is not called.
>>>>>>
>>>>>> - What cluster backend and application do you use to execute the job?
>>>>>>>
>>>>>>
>>>>>> I use a standalone session cluster for the moment
>>>>>>
>>>>>> - Is there anything suspicious you can find in the logs that might be
>>>>>>> related?
>>>>>>>
>>>>>>
>>>>>> no unfortunately..
>>>>>>
>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>> [hidden email]> wrote:
>>>>>>>
>>>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>>>> successfully if I run my main class from the IDE, while the job
>> listener is
>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> cluster
>>>>>>>> using the RestClusterClient..
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>
>>>>>>>> Thanks in advance for any help,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>
>>>>>>>>> Hello everybody,
>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> (with
>>>>>>>>> Flink 1.11.0).
>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Andrey Zagrebin-5
Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run flink/flink-examples/flink-examples-batch/target/WordCount.jar --output flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
**************** SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

**************** EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> wrote:
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:
> I have a spring boot job server that act as a broker towards our
> application and a Flink session cluster. To submit a job I use the
> FlinkRestClient (that is also the one used in the CLI client when I use the
> run action it if I'm not wrong). However both methods don't trigger the job
> listener.
>
> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto:
>
>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> not actually using that manually, right? You're just submitting your job
>> via "bin/flink run ...", right?
>>
>> What's the exact invocation of "bin/flink run" that you're using?
>>
>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>> Hi Flavio,
>>>
>>> I think I can reproduce what you are reporting (assuming you also pass
>>> '--output' to 'flink run').
>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>> ticket for this.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]
>>>
>>> wrote:
>>>
>>>> is this a bug or is it a documentation problem...?
>>>>
>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> I've also verified that the problem persist also using a modified
>> version
>>>>> of the WordCount class.
>>>>> If you add the code pasted at the end of this email at the end of its
>>>>> main method you can verify that the listener is called if you run the
>>>>> program from the IDE, but it's not called if you submit the job using
>> the
>>>>> CLI client using the command
>>>>>
>>>>>      - bin/flink run
>>>>>
>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>
>>>>> Maybe this is an expected result but I didn't find any documentation of
>>>>> this behaviour (neither in the Javadoc or in the flink web site, where
>> I
>>>>> can't find any documentation about JobListener at all).
>>>>>
>>>>> [Code to add to main()]
>>>>>       // emit result
>>>>>       if (params.has("output")) {
>>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>         // execute program
>>>>>         env.registerJobListener(new JobListener() {
>>>>>
>>>>>           @Override
>>>>>           public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>>>             System.out.println("**************** SUBMITTED");
>>>>>           }
>>>>>
>>>>>           @Override
>>>>>           public void onJobExecuted(JobExecutionResult arg0, Throwable
>>>>> arg1) {
>>>>>             System.out.println("**************** EXECUTED");
>>>>>           }
>>>>>         });
>>>>>         env.execute("WordCount Example");
>>>>>       } else {
>>>>>         System.out.println("Printing result to stdout. Use --output to
>>>>> specify output path.");
>>>>>         counts.print();
>>>>>       }
>>>>>
>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> see inline
>>>>>>
>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>>>> scritto:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> the
>>>>>>> following questions, please:
>>>>>>> - What's the code of your Job's main method?
>>>>>>>
>>>>>>
>>>>>> it's actually very simple...the main class creates a batch execution
>> env
>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>>>> listener to the env and I do some stuff before calling env.execute().
>>>>>> The listener is executed correctly but if I use the RestClusterClient
>> to
>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>>>> program is executed as usual but the job listener is not called.
>>>>>>
>>>>>> - What cluster backend and application do you use to execute the job?
>>>>>>>
>>>>>>
>>>>>> I use a standalone session cluster for the moment
>>>>>>
>>>>>> - Is there anything suspicious you can find in the logs that might be
>>>>>>> related?
>>>>>>>
>>>>>>
>>>>>> no unfortunately..
>>>>>>
>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>> [hidden email]> wrote:
>>>>>>>
>>>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>>>> successfully if I run my main class from the IDE, while the job
>> listener is
>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> cluster
>>>>>>>> using the RestClusterClient..
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>
>>>>>>>> Thanks in advance for any help,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>
>>>>>>>>> Hello everybody,
>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> (with
>>>>>>>>> Flink 1.11.0).
>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
In reply to this post by Aljoscha Krettek
Hi Aljoscha, in my main class, within the jar, I create the env and I call env.execute(). The listener is not called if the job is ran by the CLI client or FlinkRestClient, I don't see anything on the job manager or task manager. To me this is a bug and you can verify it attaching a listener to the WordCount example and launching the job using the CLI client. If this is the expected behaviour it is not reported anywhere

Il gio 19 nov 2020, 12:40 Aljoscha Krettek <[hidden email]> ha scritto:
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:
> I have a spring boot job server that act as a broker towards our
> application and a Flink session cluster. To submit a job I use the
> FlinkRestClient (that is also the one used in the CLI client when I use the
> run action it if I'm not wrong). However both methods don't trigger the job
> listener.
>
> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto:
>
>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> not actually using that manually, right? You're just submitting your job
>> via "bin/flink run ...", right?
>>
>> What's the exact invocation of "bin/flink run" that you're using?
>>
>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>> Hi Flavio,
>>>
>>> I think I can reproduce what you are reporting (assuming you also pass
>>> '--output' to 'flink run').
>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>> ticket for this.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]
>>>
>>> wrote:
>>>
>>>> is this a bug or is it a documentation problem...?
>>>>
>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> I've also verified that the problem persist also using a modified
>> version
>>>>> of the WordCount class.
>>>>> If you add the code pasted at the end of this email at the end of its
>>>>> main method you can verify that the listener is called if you run the
>>>>> program from the IDE, but it's not called if you submit the job using
>> the
>>>>> CLI client using the command
>>>>>
>>>>>      - bin/flink run
>>>>>
>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>
>>>>> Maybe this is an expected result but I didn't find any documentation of
>>>>> this behaviour (neither in the Javadoc or in the flink web site, where
>> I
>>>>> can't find any documentation about JobListener at all).
>>>>>
>>>>> [Code to add to main()]
>>>>>       // emit result
>>>>>       if (params.has("output")) {
>>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>         // execute program
>>>>>         env.registerJobListener(new JobListener() {
>>>>>
>>>>>           @Override
>>>>>           public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>>>             System.out.println("**************** SUBMITTED");
>>>>>           }
>>>>>
>>>>>           @Override
>>>>>           public void onJobExecuted(JobExecutionResult arg0, Throwable
>>>>> arg1) {
>>>>>             System.out.println("**************** EXECUTED");
>>>>>           }
>>>>>         });
>>>>>         env.execute("WordCount Example");
>>>>>       } else {
>>>>>         System.out.println("Printing result to stdout. Use --output to
>>>>> specify output path.");
>>>>>         counts.print();
>>>>>       }
>>>>>
>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> see inline
>>>>>>
>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>>>> scritto:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> the
>>>>>>> following questions, please:
>>>>>>> - What's the code of your Job's main method?
>>>>>>>
>>>>>>
>>>>>> it's actually very simple...the main class creates a batch execution
>> env
>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>>>> listener to the env and I do some stuff before calling env.execute().
>>>>>> The listener is executed correctly but if I use the RestClusterClient
>> to
>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>>>> program is executed as usual but the job listener is not called.
>>>>>>
>>>>>> - What cluster backend and application do you use to execute the job?
>>>>>>>
>>>>>>
>>>>>> I use a standalone session cluster for the moment
>>>>>>
>>>>>> - Is there anything suspicious you can find in the logs that might be
>>>>>>> related?
>>>>>>>
>>>>>>
>>>>>> no unfortunately..
>>>>>>
>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>> [hidden email]> wrote:
>>>>>>>
>>>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>>>> successfully if I run my main class from the IDE, while the job
>> listener is
>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> cluster
>>>>>>>> using the RestClusterClient..
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>
>>>>>>>> Thanks in advance for any help,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>
>>>>>>>>> Hello everybody,
>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> (with
>>>>>>>>> Flink 1.11.0).
>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
In reply to this post by Andrey Zagrebin-5
Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job listener output..

Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run flink/flink-examples/flink-examples-batch/target/WordCount.jar --output flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
**************** SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

**************** EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]> wrote:
JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:
> I have a spring boot job server that act as a broker towards our
> application and a Flink session cluster. To submit a job I use the
> FlinkRestClient (that is also the one used in the CLI client when I use the
> run action it if I'm not wrong). However both methods don't trigger the job
> listener.
>
> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha scritto:
>
>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> not actually using that manually, right? You're just submitting your job
>> via "bin/flink run ...", right?
>>
>> What's the exact invocation of "bin/flink run" that you're using?
>>
>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>> Hi Flavio,
>>>
>>> I think I can reproduce what you are reporting (assuming you also pass
>>> '--output' to 'flink run').
>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>> ticket for this.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <[hidden email]
>>>
>>> wrote:
>>>
>>>> is this a bug or is it a documentation problem...?
>>>>
>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]> ha
>>>> scritto:
>>>>
>>>>> I've also verified that the problem persist also using a modified
>> version
>>>>> of the WordCount class.
>>>>> If you add the code pasted at the end of this email at the end of its
>>>>> main method you can verify that the listener is called if you run the
>>>>> program from the IDE, but it's not called if you submit the job using
>> the
>>>>> CLI client using the command
>>>>>
>>>>>      - bin/flink run
>>>>>
>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>
>>>>> Maybe this is an expected result but I didn't find any documentation of
>>>>> this behaviour (neither in the Javadoc or in the flink web site, where
>> I
>>>>> can't find any documentation about JobListener at all).
>>>>>
>>>>> [Code to add to main()]
>>>>>       // emit result
>>>>>       if (params.has("output")) {
>>>>>         counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>         // execute program
>>>>>         env.registerJobListener(new JobListener() {
>>>>>
>>>>>           @Override
>>>>>           public void onJobSubmitted(JobClient arg0, Throwable arg1) {
>>>>>             System.out.println("**************** SUBMITTED");
>>>>>           }
>>>>>
>>>>>           @Override
>>>>>           public void onJobExecuted(JobExecutionResult arg0, Throwable
>>>>> arg1) {
>>>>>             System.out.println("**************** EXECUTED");
>>>>>           }
>>>>>         });
>>>>>         env.execute("WordCount Example");
>>>>>       } else {
>>>>>         System.out.println("Printing result to stdout. Use --output to
>>>>> specify output path.");
>>>>>         counts.print();
>>>>>       }
>>>>>
>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> see inline
>>>>>>
>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]> ha
>>>>>> scritto:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> the
>>>>>>> following questions, please:
>>>>>>> - What's the code of your Job's main method?
>>>>>>>
>>>>>>
>>>>>> it's actually very simple...the main class creates a batch execution
>> env
>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a job
>>>>>> listener to the env and I do some stuff before calling env.execute().
>>>>>> The listener is executed correctly but if I use the RestClusterClient
>> to
>>>>>> sibmit the jobGraph exyracted from that main contained in a jar, the
>>>>>> program is executed as usual but the job listener is not called.
>>>>>>
>>>>>> - What cluster backend and application do you use to execute the job?
>>>>>>>
>>>>>>
>>>>>> I use a standalone session cluster for the moment
>>>>>>
>>>>>> - Is there anything suspicious you can find in the logs that might be
>>>>>>> related?
>>>>>>>
>>>>>>
>>>>>> no unfortunately..
>>>>>>
>>>>>>
>>>>>>> Best,
>>>>>>> Matthias
>>>>>>>
>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>> [hidden email]> wrote:
>>>>>>>
>>>>>>>> Actually what I'm experiencing is that the JobListener is executed
>>>>>>>> successfully if I run my main class from the IDE, while the job
>> listener is
>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> cluster
>>>>>>>> using the RestClusterClient..
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>
>>>>>>>> Thanks in advance for any help,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>
>>>>>>>>> Hello everybody,
>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> (with
>>>>>>>>> Flink 1.11.0).
>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Aljoscha Krettek
Hmm, there was this issue:
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
in your version.

On 19.11.20 12:58, Flavio Pompermaier wrote:

> Which version are you using?
> I used the exact same commands on Flink 1.11.0 and I didn't get the job
> listener output..
>
> Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
>
>> Hi Flavio and Aljoscha,
>>
>> Sorry for the late heads up. I could not actually reproduce the reported
>> problem with 'flink run' and local standalone cluster on master.
>> I get the expected output with the suggested modification of WordCount
>> program:
>>
>> $ bin/start-cluster.sh
>>
>> $ rm -rf out; bin/flink run
>> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
>> flink/build-target/out
>>
>> Executing WordCount example with default input data set.
>> Use --input to specify file input.
>> **************** SUBMITTED
>> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
>> Program execution finished
>> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
>> Job Runtime: 139 ms
>>
>> **************** EXECUTED
>>
>> Best,
>> Andrey
>>
>> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> JobListener.onJobExecuted() is only invoked in
>>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>>> of these is still in the call chain with that setup then the listener
>>> will not be invoked.
>>>
>>> Also, this would only happen on the client, not on the broker (in your
>>> case) or the server (JobManager).
>>>
>>> Does that help to debug the problem?
>>>
>>> Aljoscha
>>>
>>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>>>> I have a spring boot job server that act as a broker towards our
>>>> application and a Flink session cluster. To submit a job I use the
>>>> FlinkRestClient (that is also the one used in the CLI client when I use
>>> the
>>>> run action it if I'm not wrong). However both methods don't trigger the
>>> job
>>>> listener.
>>>>
>>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha
>>> scritto:
>>>>
>>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
>>>>> not actually using that manually, right? You're just submitting your
>>> job
>>>>> via "bin/flink run ...", right?
>>>>>
>>>>> What's the exact invocation of "bin/flink run" that you're using?
>>>>>
>>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I think I can reproduce what you are reporting (assuming you also pass
>>>>>> '--output' to 'flink run').
>>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>>>>> ticket for this.
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>>> [hidden email]
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> is this a bug or is it a documentation problem...?
>>>>>>>
>>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]>
>>> ha
>>>>>>> scritto:
>>>>>>>
>>>>>>>> I've also verified that the problem persist also using a modified
>>>>> version
>>>>>>>> of the WordCount class.
>>>>>>>> If you add the code pasted at the end of this email at the end of
>>> its
>>>>>>>> main method you can verify that the listener is called if you run
>>> the
>>>>>>>> program from the IDE, but it's not called if you submit the job
>>> using
>>>>> the
>>>>>>>> CLI client using the command
>>>>>>>>
>>>>>>>>       - bin/flink run
>>>>>>>>
>>>>>
>>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>>>>
>>>>>>>> Maybe this is an expected result but I didn't find any
>>> documentation of
>>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
>>> where
>>>>> I
>>>>>>>> can't find any documentation about JobListener at all).
>>>>>>>>
>>>>>>>> [Code to add to main()]
>>>>>>>>        // emit result
>>>>>>>>        if (params.has("output")) {
>>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>>>>          // execute program
>>>>>>>>          env.registerJobListener(new JobListener() {
>>>>>>>>
>>>>>>>>            @Override
>>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
>>> arg1) {
>>>>>>>>              System.out.println("**************** SUBMITTED");
>>>>>>>>            }
>>>>>>>>
>>>>>>>>            @Override
>>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
>>> Throwable
>>>>>>>> arg1) {
>>>>>>>>              System.out.println("**************** EXECUTED");
>>>>>>>>            }
>>>>>>>>          });
>>>>>>>>          env.execute("WordCount Example");
>>>>>>>>        } else {
>>>>>>>>          System.out.println("Printing result to stdout. Use --output
>>> to
>>>>>>>> specify output path.");
>>>>>>>>          counts.print();
>>>>>>>>        }
>>>>>>>>
>>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> see inline
>>>>>>>>>
>>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]>
>>> ha
>>>>>>>>> scritto:
>>>>>>>>>
>>>>>>>>>> Hi Flavio,
>>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
>>>>> the
>>>>>>>>>> following questions, please:
>>>>>>>>>> - What's the code of your Job's main method?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> it's actually very simple...the main class creates a batch
>>> execution
>>>>> env
>>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
>>> job
>>>>>>>>> listener to the env and I do some stuff before calling
>>> env.execute().
>>>>>>>>> The listener is executed correctly but if I use the
>>> RestClusterClient
>>>>> to
>>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
>>> the
>>>>>>>>> program is executed as usual but the job listener is not called.
>>>>>>>>>
>>>>>>>>> - What cluster backend and application do you use to execute the
>>> job?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I use a standalone session cluster for the moment
>>>>>>>>>
>>>>>>>>> - Is there anything suspicious you can find in the logs that might
>>> be
>>>>>>>>>> related?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> no unfortunately..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matthias
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
>>> executed
>>>>>>>>>>> successfully if I run my main class from the IDE, while the job
>>>>> listener is
>>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>>>>> cluster
>>>>>>>>>>> using the RestClusterClient..
>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>
>>>>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for any help,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello everybody,
>>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>>>>> (with
>>>>>>>>>>>> Flink 1.11.0).
>>>>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Flavio
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Andrey Zagrebin-5
I also tried 1.11.0 and 1.11.2, both work for me.

On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote:
Hmm, there was this issue:
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
in your version.

On 19.11.20 12:58, Flavio Pompermaier wrote:
> Which version are you using?
> I used the exact same commands on Flink 1.11.0 and I didn't get the job
> listener output..
>
> Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
>
>> Hi Flavio and Aljoscha,
>>
>> Sorry for the late heads up. I could not actually reproduce the reported
>> problem with 'flink run' and local standalone cluster on master.
>> I get the expected output with the suggested modification of WordCount
>> program:
>>
>> $ bin/start-cluster.sh
>>
>> $ rm -rf out; bin/flink run
>> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
>> flink/build-target/out
>>
>> Executing WordCount example with default input data set.
>> Use --input to specify file input.
>> **************** SUBMITTED
>> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
>> Program execution finished
>> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
>> Job Runtime: 139 ms
>>
>> **************** EXECUTED
>>
>> Best,
>> Andrey
>>
>> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> JobListener.onJobExecuted() is only invoked in
>>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>>> of these is still in the call chain with that setup then the listener
>>> will not be invoked.
>>>
>>> Also, this would only happen on the client, not on the broker (in your
>>> case) or the server (JobManager).
>>>
>>> Does that help to debug the problem?
>>>
>>> Aljoscha
>>>
>>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>>>> I have a spring boot job server that act as a broker towards our
>>>> application and a Flink session cluster. To submit a job I use the
>>>> FlinkRestClient (that is also the one used in the CLI client when I use
>>> the
>>>> run action it if I'm not wrong). However both methods don't trigger the
>>> job
>>>> listener.
>>>>
>>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha
>>> scritto:
>>>>
>>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
>>>>> not actually using that manually, right? You're just submitting your
>>> job
>>>>> via "bin/flink run ...", right?
>>>>>
>>>>> What's the exact invocation of "bin/flink run" that you're using?
>>>>>
>>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I think I can reproduce what you are reporting (assuming you also pass
>>>>>> '--output' to 'flink run').
>>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
>>>>>> ticket for this.
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>>> [hidden email]
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> is this a bug or is it a documentation problem...?
>>>>>>>
>>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]>
>>> ha
>>>>>>> scritto:
>>>>>>>
>>>>>>>> I've also verified that the problem persist also using a modified
>>>>> version
>>>>>>>> of the WordCount class.
>>>>>>>> If you add the code pasted at the end of this email at the end of
>>> its
>>>>>>>> main method you can verify that the listener is called if you run
>>> the
>>>>>>>> program from the IDE, but it's not called if you submit the job
>>> using
>>>>> the
>>>>>>>> CLI client using the command
>>>>>>>>
>>>>>>>>       - bin/flink run
>>>>>>>>
>>>>>
>>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>>>>>>>>
>>>>>>>> Maybe this is an expected result but I didn't find any
>>> documentation of
>>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
>>> where
>>>>> I
>>>>>>>> can't find any documentation about JobListener at all).
>>>>>>>>
>>>>>>>> [Code to add to main()]
>>>>>>>>        // emit result
>>>>>>>>        if (params.has("output")) {
>>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
>>>>>>>>          // execute program
>>>>>>>>          env.registerJobListener(new JobListener() {
>>>>>>>>
>>>>>>>>            @Override
>>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
>>> arg1) {
>>>>>>>>              System.out.println("**************** SUBMITTED");
>>>>>>>>            }
>>>>>>>>
>>>>>>>>            @Override
>>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
>>> Throwable
>>>>>>>> arg1) {
>>>>>>>>              System.out.println("**************** EXECUTED");
>>>>>>>>            }
>>>>>>>>          });
>>>>>>>>          env.execute("WordCount Example");
>>>>>>>>        } else {
>>>>>>>>          System.out.println("Printing result to stdout. Use --output
>>> to
>>>>>>>> specify output path.");
>>>>>>>>          counts.print();
>>>>>>>>        }
>>>>>>>>
>>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> see inline
>>>>>>>>>
>>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]>
>>> ha
>>>>>>>>> scritto:
>>>>>>>>>
>>>>>>>>>> Hi Flavio,
>>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
>>>>> the
>>>>>>>>>> following questions, please:
>>>>>>>>>> - What's the code of your Job's main method?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> it's actually very simple...the main class creates a batch
>>> execution
>>>>> env
>>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
>>> job
>>>>>>>>> listener to the env and I do some stuff before calling
>>> env.execute().
>>>>>>>>> The listener is executed correctly but if I use the
>>> RestClusterClient
>>>>> to
>>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
>>> the
>>>>>>>>> program is executed as usual but the job listener is not called.
>>>>>>>>>
>>>>>>>>> - What cluster backend and application do you use to execute the
>>> job?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I use a standalone session cluster for the moment
>>>>>>>>>
>>>>>>>>> - Is there anything suspicious you can find in the logs that might
>>> be
>>>>>>>>>> related?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> no unfortunately..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matthias
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
>>> executed
>>>>>>>>>>> successfully if I run my main class from the IDE, while the job
>>>>> listener is
>>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>>>>> cluster
>>>>>>>>>>> using the RestClusterClient..
>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>
>>>>>>>>>>> My main class ends with the env.execute() and i do
>>>>>>>>>>> env.registerJobListener() when I create the Exceution env
>>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for any help,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello everybody,
>>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>>>>> (with
>>>>>>>>>>>> Flink 1.11.0).
>>>>>>>>>>>> It works great but I have the problem that logs inside
>>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Flavio
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
You're right..I removed my flink dir and I re-extracted it and now it
works. Unfortunately I didn't keep the old version to understand what
were the difference but the error was probably caused by the fact that
I had a previous version of the WordCount.jar (without the listener)
in the flink lib dir.. (in another dev session I was experimenting in
running the job having the user jar in the lib dir). Sorry for the
confusion.
Just one last question: is the listener executed on the client or on
the job server? This is not entirely clear to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <[hidden email]> wrote:

>
> I also tried 1.11.0 and 1.11.2, both work for me.
>
> On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote:
>>
>> Hmm, there was this issue:
>> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
>> in your version.
>>
>> On 19.11.20 12:58, Flavio Pompermaier wrote:
>> > Which version are you using?
>> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
>> > listener output..
>> >
>> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
>> >
>> >> Hi Flavio and Aljoscha,
>> >>
>> >> Sorry for the late heads up. I could not actually reproduce the reported
>> >> problem with 'flink run' and local standalone cluster on master.
>> >> I get the expected output with the suggested modification of WordCount
>> >> program:
>> >>
>> >> $ bin/start-cluster.sh
>> >>
>> >> $ rm -rf out; bin/flink run
>> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
>> >> flink/build-target/out
>> >>
>> >> Executing WordCount example with default input data set.
>> >> Use --input to specify file input.
>> >> **************** SUBMITTED
>> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
>> >> Program execution finished
>> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
>> >> Job Runtime: 139 ms
>> >>
>> >> **************** EXECUTED
>> >>
>> >> Best,
>> >> Andrey
>> >>
>> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]>
>> >> wrote:
>> >>
>> >>> JobListener.onJobExecuted() is only invoked in
>> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
>> >>> of these is still in the call chain with that setup then the listener
>> >>> will not be invoked.
>> >>>
>> >>> Also, this would only happen on the client, not on the broker (in your
>> >>> case) or the server (JobManager).
>> >>>
>> >>> Does that help to debug the problem?
>> >>>
>> >>> Aljoscha
>> >>>
>> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
>> >>>> I have a spring boot job server that act as a broker towards our
>> >>>> application and a Flink session cluster. To submit a job I use the
>> >>>> FlinkRestClient (that is also the one used in the CLI client when I use
>> >>> the
>> >>>> run action it if I'm not wrong). However both methods don't trigger the
>> >>> job
>> >>>> listener.
>> >>>>
>> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha
>> >>> scritto:
>> >>>>
>> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
>> >>>>> not actually using that manually, right? You're just submitting your
>> >>> job
>> >>>>> via "bin/flink run ...", right?
>> >>>>>
>> >>>>> What's the exact invocation of "bin/flink run" that you're using?
>> >>>>>
>> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
>> >>>>>> Hi Flavio,
>> >>>>>>
>> >>>>>> I think I can reproduce what you are reporting (assuming you also pass
>> >>>>>> '--output' to 'flink run').
>> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
>> >>>>>> ticket for this.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Andrey
>> >>>>>>
>> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
>> >>> [hidden email]
>> >>>>>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> is this a bug or is it a documentation problem...?
>> >>>>>>>
>> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]>
>> >>> ha
>> >>>>>>> scritto:
>> >>>>>>>
>> >>>>>>>> I've also verified that the problem persist also using a modified
>> >>>>> version
>> >>>>>>>> of the WordCount class.
>> >>>>>>>> If you add the code pasted at the end of this email at the end of
>> >>> its
>> >>>>>>>> main method you can verify that the listener is called if you run
>> >>> the
>> >>>>>>>> program from the IDE, but it's not called if you submit the job
>> >>> using
>> >>>>> the
>> >>>>>>>> CLI client using the command
>> >>>>>>>>
>> >>>>>>>>       - bin/flink run
>> >>>>>>>>
>> >>>>>
>> >>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
>> >>>>>>>>
>> >>>>>>>> Maybe this is an expected result but I didn't find any
>> >>> documentation of
>> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
>> >>> where
>> >>>>> I
>> >>>>>>>> can't find any documentation about JobListener at all).
>> >>>>>>>>
>> >>>>>>>> [Code to add to main()]
>> >>>>>>>>        // emit result
>> >>>>>>>>        if (params.has("output")) {
>> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
>> >>>>>>>>          // execute program
>> >>>>>>>>          env.registerJobListener(new JobListener() {
>> >>>>>>>>
>> >>>>>>>>            @Override
>> >>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
>> >>> arg1) {
>> >>>>>>>>              System.out.println("**************** SUBMITTED");
>> >>>>>>>>            }
>> >>>>>>>>
>> >>>>>>>>            @Override
>> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
>> >>> Throwable
>> >>>>>>>> arg1) {
>> >>>>>>>>              System.out.println("**************** EXECUTED");
>> >>>>>>>>            }
>> >>>>>>>>          });
>> >>>>>>>>          env.execute("WordCount Example");
>> >>>>>>>>        } else {
>> >>>>>>>>          System.out.println("Printing result to stdout. Use --output
>> >>> to
>> >>>>>>>> specify output path.");
>> >>>>>>>>          counts.print();
>> >>>>>>>>        }
>> >>>>>>>>
>> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
>> >>>>> [hidden email]>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> see inline
>> >>>>>>>>>
>> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]>
>> >>> ha
>> >>>>>>>>> scritto:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Flavio,
>> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
>> >>>>> the
>> >>>>>>>>>> following questions, please:
>> >>>>>>>>>> - What's the code of your Job's main method?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> it's actually very simple...the main class creates a batch
>> >>> execution
>> >>>>> env
>> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
>> >>> job
>> >>>>>>>>> listener to the env and I do some stuff before calling
>> >>> env.execute().
>> >>>>>>>>> The listener is executed correctly but if I use the
>> >>> RestClusterClient
>> >>>>> to
>> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
>> >>> the
>> >>>>>>>>> program is executed as usual but the job listener is not called.
>> >>>>>>>>>
>> >>>>>>>>> - What cluster backend and application do you use to execute the
>> >>> job?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> I use a standalone session cluster for the moment
>> >>>>>>>>>
>> >>>>>>>>> - Is there anything suspicious you can find in the logs that might
>> >>> be
>> >>>>>>>>>> related?
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> no unfortunately..
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
>> >>>>>>>>>> [hidden email]> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
>> >>> executed
>> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job
>> >>>>> listener is
>> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
>> >>>>> cluster
>> >>>>>>>>>>> using the RestClusterClient..
>> >>>>>>>>>>> Am I doing something wrong?
>> >>>>>>>>>>>
>> >>>>>>>>>>> My main class ends with the env.execute() and i do
>> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
>> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks in advance for any help,
>> >>>>>>>>>>> Flavio
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
>> >>>>>>>>>>> [hidden email]> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hello everybody,
>> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
>> >>>>> (with
>> >>>>>>>>>>>> Flink 1.11.0).
>> >>>>>>>>>>>> It works great but I have the problem that logs inside
>> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best,
>> >>>>>>>>>>>> Flavio
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
I think that the problem is that my REST service submits the job to
the Flink standalone cluster and responds to the client with the
submitted job ID.
To achieve this, I was using the
RestClusterClient<StandaloneClusterId> because with that I can use the
following code and retrieve the JobID:

    (1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

Unfortunately this does not activate the job listener (that is quite
surprising to me...I thought that such a listener was triggered by the
JobManager).
So, after Aljoscha answer I take a deeper look into the Flink CLI code
and what it does is basically this:

    (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
flinkConf, packagedProgram, false, false);

That works as expected (I wasn't aware of the ThreadLocal mechanism
used by the ContextEnvironment and StreamContextEnvironment: a very
advanced programming technique) but it does not allow to get back the
job id that I need..I can live with that because I can save the Flink
Job ID in an external service when the job listener triggers the
onJobSubmitted method but I think this mechanism is quite weird..

So my question is: is there a simple way to achieve my goal? Am I
doing something wrong?
At the moment I had to implement a job-status polling thread after the
line (1) but this looks like a  workaround to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <[hidden email]> wrote:

>
> You're right..I removed my flink dir and I re-extracted it and now it
> works. Unfortunately I didn't keep the old version to understand what
> were the difference but the error was probably caused by the fact that
> I had a previous version of the WordCount.jar (without the listener)
> in the flink lib dir.. (in another dev session I was experimenting in
> running the job having the user jar in the lib dir). Sorry for the
> confusion.
> Just one last question: is the listener executed on the client or on
> the job server? This is not entirely clear to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <[hidden email]> wrote:
> >
> > I also tried 1.11.0 and 1.11.2, both work for me.
> >
> > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote:
> >>
> >> Hmm, there was this issue:
> >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> >> in your version.
> >>
> >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> >> > Which version are you using?
> >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
> >> > listener output..
> >> >
> >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
> >> >
> >> >> Hi Flavio and Aljoscha,
> >> >>
> >> >> Sorry for the late heads up. I could not actually reproduce the reported
> >> >> problem with 'flink run' and local standalone cluster on master.
> >> >> I get the expected output with the suggested modification of WordCount
> >> >> program:
> >> >>
> >> >> $ bin/start-cluster.sh
> >> >>
> >> >> $ rm -rf out; bin/flink run
> >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> >> >> flink/build-target/out
> >> >>
> >> >> Executing WordCount example with default input data set.
> >> >> Use --input to specify file input.
> >> >> **************** SUBMITTED
> >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> >> >> Program execution finished
> >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> >> >> Job Runtime: 139 ms
> >> >>
> >> >> **************** EXECUTED
> >> >>
> >> >> Best,
> >> >> Andrey
> >> >>
> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]>
> >> >> wrote:
> >> >>
> >> >>> JobListener.onJobExecuted() is only invoked in
> >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
> >> >>> of these is still in the call chain with that setup then the listener
> >> >>> will not be invoked.
> >> >>>
> >> >>> Also, this would only happen on the client, not on the broker (in your
> >> >>> case) or the server (JobManager).
> >> >>>
> >> >>> Does that help to debug the problem?
> >> >>>
> >> >>> Aljoscha
> >> >>>
> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
> >> >>>> I have a spring boot job server that act as a broker towards our
> >> >>>> application and a Flink session cluster. To submit a job I use the
> >> >>>> FlinkRestClient (that is also the one used in the CLI client when I use
> >> >>> the
> >> >>>> run action it if I'm not wrong). However both methods don't trigger the
> >> >>> job
> >> >>>> listener.
> >> >>>>
> >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha
> >> >>> scritto:
> >> >>>>
> >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
> >> >>>>> not actually using that manually, right? You're just submitting your
> >> >>> job
> >> >>>>> via "bin/flink run ...", right?
> >> >>>>>
> >> >>>>> What's the exact invocation of "bin/flink run" that you're using?
> >> >>>>>
> >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >> >>>>>> Hi Flavio,
> >> >>>>>>
> >> >>>>>> I think I can reproduce what you are reporting (assuming you also pass
> >> >>>>>> '--output' to 'flink run').
> >> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
> >> >>>>>> ticket for this.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Andrey
> >> >>>>>>
> >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> >> >>> [hidden email]
> >> >>>>>>
> >> >>>>>> wrote:
> >> >>>>>>
> >> >>>>>>> is this a bug or is it a documentation problem...?
> >> >>>>>>>
> >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]>
> >> >>> ha
> >> >>>>>>> scritto:
> >> >>>>>>>
> >> >>>>>>>> I've also verified that the problem persist also using a modified
> >> >>>>> version
> >> >>>>>>>> of the WordCount class.
> >> >>>>>>>> If you add the code pasted at the end of this email at the end of
> >> >>> its
> >> >>>>>>>> main method you can verify that the listener is called if you run
> >> >>> the
> >> >>>>>>>> program from the IDE, but it's not called if you submit the job
> >> >>> using
> >> >>>>> the
> >> >>>>>>>> CLI client using the command
> >> >>>>>>>>
> >> >>>>>>>>       - bin/flink run
> >> >>>>>>>>
> >> >>>>>
> >> >>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >> >>>>>>>>
> >> >>>>>>>> Maybe this is an expected result but I didn't find any
> >> >>> documentation of
> >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
> >> >>> where
> >> >>>>> I
> >> >>>>>>>> can't find any documentation about JobListener at all).
> >> >>>>>>>>
> >> >>>>>>>> [Code to add to main()]
> >> >>>>>>>>        // emit result
> >> >>>>>>>>        if (params.has("output")) {
> >> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
> >> >>>>>>>>          // execute program
> >> >>>>>>>>          env.registerJobListener(new JobListener() {
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
> >> >>> arg1) {
> >> >>>>>>>>              System.out.println("**************** SUBMITTED");
> >> >>>>>>>>            }
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
> >> >>> Throwable
> >> >>>>>>>> arg1) {
> >> >>>>>>>>              System.out.println("**************** EXECUTED");
> >> >>>>>>>>            }
> >> >>>>>>>>          });
> >> >>>>>>>>          env.execute("WordCount Example");
> >> >>>>>>>>        } else {
> >> >>>>>>>>          System.out.println("Printing result to stdout. Use --output
> >> >>> to
> >> >>>>>>>> specify output path.");
> >> >>>>>>>>          counts.print();
> >> >>>>>>>>        }
> >> >>>>>>>>
> >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> >>>>> [hidden email]>
> >> >>>>>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> see inline
> >> >>>>>>>>>
> >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]>
> >> >>> ha
> >> >>>>>>>>> scritto:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Flavio,
> >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
> >> >>>>> the
> >> >>>>>>>>>> following questions, please:
> >> >>>>>>>>>> - What's the code of your Job's main method?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> it's actually very simple...the main class creates a batch
> >> >>> execution
> >> >>>>> env
> >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
> >> >>> job
> >> >>>>>>>>> listener to the env and I do some stuff before calling
> >> >>> env.execute().
> >> >>>>>>>>> The listener is executed correctly but if I use the
> >> >>> RestClusterClient
> >> >>>>> to
> >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
> >> >>> the
> >> >>>>>>>>> program is executed as usual but the job listener is not called.
> >> >>>>>>>>>
> >> >>>>>>>>> - What cluster backend and application do you use to execute the
> >> >>> job?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> I use a standalone session cluster for the moment
> >> >>>>>>>>>
> >> >>>>>>>>> - Is there anything suspicious you can find in the logs that might
> >> >>> be
> >> >>>>>>>>>> related?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> no unfortunately..
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Matthias
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> >> >>>>>>>>>> [hidden email]> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
> >> >>> executed
> >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job
> >> >>>>> listener is
> >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
> >> >>>>> cluster
> >> >>>>>>>>>>> using the RestClusterClient..
> >> >>>>>>>>>>> Am I doing something wrong?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> My main class ends with the env.execute() and i do
> >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
> >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks in advance for any help,
> >> >>>>>>>>>>> Flavio
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >> >>>>>>>>>>> [hidden email]> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> Hello everybody,
> >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
> >> >>>>> (with
> >> >>>>>>>>>>>> Flink 1.11.0).
> >> >>>>>>>>>>>> It works great but I have the problem that logs inside
> >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best,
> >> >>>>>>>>>>>> Flavio
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >
> >>
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
My final analysis is that the RestClusterClient lack of many methods (jarUpload, jarRun, getExceptions for example) and that the submitJob (and the JobSubmitHandler endpoint) is bugged or should be deprecated (because it does not call the job listeners).
Indeed, if the JarRunHandler endpoint is invoked (e.b. from the Web UI) the job listeners are invoked correctly. 
In order to do what I want I see the following options (correct me if I'm wrong):
  1. Wait for JobSubmitHandler to be fixed (if the fact that the job listeners are not called is a bug..maybe it is not but this should be documented at least)
  2. Extend the RestClusterClient in order to replace the call to the submitJob with jarUpload + jarRun (+jarDelete maybe)
  3. Obtain a JobID from ClientUtils.executeProgram()..but how to do that is not clear at all to me..is that possibile?
Thanks in advance for any support,
Flavio


On Fri, Nov 20, 2020 at 10:09 PM Flavio Pompermaier <[hidden email]> wrote:
I think that the problem is that my REST service submits the job to
the Flink standalone cluster and responds to the client with the
submitted job ID.
To achieve this, I was using the
RestClusterClient<StandaloneClusterId> because with that I can use the
following code and retrieve the JobID:

    (1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

Unfortunately this does not activate the job listener (that is quite
surprising to me...I thought that such a listener was triggered by the
JobManager).
So, after Aljoscha answer I take a deeper look into the Flink CLI code
and what it does is basically this:

    (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
flinkConf, packagedProgram, false, false);

That works as expected (I wasn't aware of the ThreadLocal mechanism
used by the ContextEnvironment and StreamContextEnvironment: a very
advanced programming technique) but it does not allow to get back the
job id that I need..I can live with that because I can save the Flink
Job ID in an external service when the job listener triggers the
onJobSubmitted method but I think this mechanism is quite weird..

So my question is: is there a simple way to achieve my goal? Am I
doing something wrong?
At the moment I had to implement a job-status polling thread after the
line (1) but this looks like a  workaround to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <[hidden email]> wrote:
>
> You're right..I removed my flink dir and I re-extracted it and now it
> works. Unfortunately I didn't keep the old version to understand what
> were the difference but the error was probably caused by the fact that
> I had a previous version of the WordCount.jar (without the listener)
> in the flink lib dir.. (in another dev session I was experimenting in
> running the job having the user jar in the lib dir). Sorry for the
> confusion.
> Just one last question: is the listener executed on the client or on
> the job server? This is not entirely clear to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <[hidden email]> wrote:
> >
> > I also tried 1.11.0 and 1.11.2, both work for me.
> >
> > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <[hidden email]> wrote:
> >>
> >> Hmm, there was this issue:
> >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> >> in your version.
> >>
> >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> >> > Which version are you using?
> >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
> >> > listener output..
> >> >
> >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <[hidden email]> ha scritto:
> >> >
> >> >> Hi Flavio and Aljoscha,
> >> >>
> >> >> Sorry for the late heads up. I could not actually reproduce the reported
> >> >> problem with 'flink run' and local standalone cluster on master.
> >> >> I get the expected output with the suggested modification of WordCount
> >> >> program:
> >> >>
> >> >> $ bin/start-cluster.sh
> >> >>
> >> >> $ rm -rf out; bin/flink run
> >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> >> >> flink/build-target/out
> >> >>
> >> >> Executing WordCount example with default input data set.
> >> >> Use --input to specify file input.
> >> >> **************** SUBMITTED
> >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> >> >> Program execution finished
> >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> >> >> Job Runtime: 139 ms
> >> >>
> >> >> **************** EXECUTED
> >> >>
> >> >> Best,
> >> >> Andrey
> >> >>
> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <[hidden email]>
> >> >> wrote:
> >> >>
> >> >>> JobListener.onJobExecuted() is only invoked in
> >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
> >> >>> of these is still in the call chain with that setup then the listener
> >> >>> will not be invoked.
> >> >>>
> >> >>> Also, this would only happen on the client, not on the broker (in your
> >> >>> case) or the server (JobManager).
> >> >>>
> >> >>> Does that help to debug the problem?
> >> >>>
> >> >>> Aljoscha
> >> >>>
> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
> >> >>>> I have a spring boot job server that act as a broker towards our
> >> >>>> application and a Flink session cluster. To submit a job I use the
> >> >>>> FlinkRestClient (that is also the one used in the CLI client when I use
> >> >>> the
> >> >>>> run action it if I'm not wrong). However both methods don't trigger the
> >> >>> job
> >> >>>> listener.
> >> >>>>
> >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <[hidden email]> ha
> >> >>> scritto:
> >> >>>>
> >> >>>>> @Flavio, when you're saying you're using the RestClusterClient, you are
> >> >>>>> not actually using that manually, right? You're just submitting your
> >> >>> job
> >> >>>>> via "bin/flink run ...", right?
> >> >>>>>
> >> >>>>> What's the exact invocation of "bin/flink run" that you're using?
> >> >>>>>
> >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
> >> >>>>>> Hi Flavio,
> >> >>>>>>
> >> >>>>>> I think I can reproduce what you are reporting (assuming you also pass
> >> >>>>>> '--output' to 'flink run').
> >> >>>>>> I am not sure why it behaves like this. I would suggest filing a Jira
> >> >>>>>> ticket for this.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Andrey
> >> >>>>>>
> >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> >> >>> [hidden email]
> >> >>>>>>
> >> >>>>>> wrote:
> >> >>>>>>
> >> >>>>>>> is this a bug or is it a documentation problem...?
> >> >>>>>>>
> >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <[hidden email]>
> >> >>> ha
> >> >>>>>>> scritto:
> >> >>>>>>>
> >> >>>>>>>> I've also verified that the problem persist also using a modified
> >> >>>>> version
> >> >>>>>>>> of the WordCount class.
> >> >>>>>>>> If you add the code pasted at the end of this email at the end of
> >> >>> its
> >> >>>>>>>> main method you can verify that the listener is called if you run
> >> >>> the
> >> >>>>>>>> program from the IDE, but it's not called if you submit the job
> >> >>> using
> >> >>>>> the
> >> >>>>>>>> CLI client using the command
> >> >>>>>>>>
> >> >>>>>>>>       - bin/flink run
> >> >>>>>>>>
> >> >>>>>
> >> >>>   /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> >> >>>>>>>>
> >> >>>>>>>> Maybe this is an expected result but I didn't find any
> >> >>> documentation of
> >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web site,
> >> >>> where
> >> >>>>> I
> >> >>>>>>>> can't find any documentation about JobListener at all).
> >> >>>>>>>>
> >> >>>>>>>> [Code to add to main()]
> >> >>>>>>>>        // emit result
> >> >>>>>>>>        if (params.has("output")) {
> >> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
> >> >>>>>>>>          // execute program
> >> >>>>>>>>          env.registerJobListener(new JobListener() {
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobSubmitted(JobClient arg0, Throwable
> >> >>> arg1) {
> >> >>>>>>>>              System.out.println("**************** SUBMITTED");
> >> >>>>>>>>            }
> >> >>>>>>>>
> >> >>>>>>>>            @Override
> >> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
> >> >>> Throwable
> >> >>>>>>>> arg1) {
> >> >>>>>>>>              System.out.println("**************** EXECUTED");
> >> >>>>>>>>            }
> >> >>>>>>>>          });
> >> >>>>>>>>          env.execute("WordCount Example");
> >> >>>>>>>>        } else {
> >> >>>>>>>>          System.out.println("Printing result to stdout. Use --output
> >> >>> to
> >> >>>>>>>> specify output path.");
> >> >>>>>>>>          counts.print();
> >> >>>>>>>>        }
> >> >>>>>>>>
> >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> >> >>>>> [hidden email]>
> >> >>>>>>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> see inline
> >> >>>>>>>>>
> >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <[hidden email]>
> >> >>> ha
> >> >>>>>>>>> scritto:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Flavio,
> >> >>>>>>>>>> thanks for sharing this with the Flink community. Could you answer
> >> >>>>> the
> >> >>>>>>>>>> following questions, please:
> >> >>>>>>>>>> - What's the code of your Job's main method?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> it's actually very simple...the main class creates a batch
> >> >>> execution
> >> >>>>> env
> >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I register a
> >> >>> job
> >> >>>>>>>>> listener to the env and I do some stuff before calling
> >> >>> env.execute().
> >> >>>>>>>>> The listener is executed correctly but if I use the
> >> >>> RestClusterClient
> >> >>>>> to
> >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a jar,
> >> >>> the
> >> >>>>>>>>> program is executed as usual but the job listener is not called.
> >> >>>>>>>>>
> >> >>>>>>>>> - What cluster backend and application do you use to execute the
> >> >>> job?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> I use a standalone session cluster for the moment
> >> >>>>>>>>>
> >> >>>>>>>>> - Is there anything suspicious you can find in the logs that might
> >> >>> be
> >> >>>>>>>>>> related?
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> no unfortunately..
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Matthias
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> >> >>>>>>>>>> [hidden email]> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
> >> >>> executed
> >> >>>>>>>>>>> successfully if I run my main class from the IDE, while the job
> >> >>>>> listener is
> >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the application to a
> >> >>>>> cluster
> >> >>>>>>>>>>> using the RestClusterClient..
> >> >>>>>>>>>>> Am I doing something wrong?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> My main class ends with the env.execute() and i do
> >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
> >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks in advance for any help,
> >> >>>>>>>>>>> Flavio
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> >> >>>>>>>>>>> [hidden email]> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> Hello everybody,
> >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job finishes
> >> >>>>> (with
> >> >>>>>>>>>>>> Flink 1.11.0).
> >> >>>>>>>>>>>> It works great but I have the problem that logs inside
> >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best,
> >> >>>>>>>>>>>> Flavio
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >
> >>
Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Aljoscha Krettek
In reply to this post by Flavio Pompermaier
On 20.11.20 22:09, Flavio Pompermaier wrote:
> To achieve this, I was using the
> RestClusterClient<StandaloneClusterId> because with that I can use the
> following code and retrieve the JobID:
>
>      (1) JobID flinkJobId =
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

All you want to do is get the JobID, correct? If yes, you can just do a
`jobGraph.getJobID()`. The job id is not set on the cluster but it's
actually set client side, on the JobGraph object.

Does that help in your case?

A general comment on your other questions: yes, the listener logic if
only used when using the environments. It's not integrated with the
RestClusterClient, which is considered more of an internal
implementation detail.

Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: Logs of JobExecutionListener

Flavio Pompermaier
Thank you Aljosha,.now that's more clear!
I didn't know that jobGraph.getJobID() was the solution for my use case..I was convinced that the job ID was assigned by the cluster!
And to me it's really weird that the job listener was not called by the submitJob...Probably this should be documented at least.
In the meanwhile I extended a little bit the RestClusterClient..do you think it could be worth issuing a PR to add some unimplemented methods?

For example I added:
- public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
- public EmptyResponseBody deleteJar(String jarFileName);
- public boolean isJobRunning(JobID fjid)
- public JarUploadResponseBody uploadJar(Path uploadedFile);

and I was also going to add jarRun..

Let me know,
Flavio

On Mon, Nov 23, 2020 at 3:57 PM Aljoscha Krettek <[hidden email]> wrote:
On 20.11.20 22:09, Flavio Pompermaier wrote:
> To achieve this, I was using the
> RestClusterClient<StandaloneClusterId> because with that I can use the
> following code and retrieve the JobID:
>
>      (1) JobID flinkJobId =
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

All you want to do is get the JobID, correct? If yes, you can just do a
`jobGraph.getJobID()`. The job id is not set on the cluster but it's
actually set client side, on the JobGraph object.

Does that help in your case?

A general comment on your other questions: yes, the listener logic if
only used when using the environments. It's not integrated with the
RestClusterClient, which is considered more of an internal
implementation detail.

Aljoscha



--
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809
12