Recommended approach to debug this

classic Classic list List threaded Threaded
25 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Recommended approach to debug this

Debasish Ghosh
Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.

Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Dian Fu
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

austin.ce
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

tison
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Vijay Bhaskar
One more suggestion is to  run the same job in regular 2 node cluster and see whether you are getting the same exception. So that you can narrow down the issue easily. 

Regards
Bhaskar



On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
In reply to this post by tison
Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Dian Fu
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--

Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Dian Fu
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--


--

Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--


--



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

tison
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--


--



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


--
Sent from my iPhone


--


--



--


--



--


--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Dian Fu
Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case. 

Regards,
Dian

在 2019年9月23日,下午11:57,Debasish Ghosh <[hidden email]> 写道:

Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


-- 
Sent from my iPhone


-- 


-- 



-- 


-- 



-- 


-- 

Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Debasish Ghosh
Hi Dian -

We submit one job through the operator. We just use the following to complete a promise when the job completes ..

      Try {
        createLogic.executeStreamingQueries(ctx.env)
      }.fold(
        th ⇒ completionPromise.tryFailure(th),
        _ ⇒ completionPromise.trySuccess(Dun)
      )


If we totally do away with the promise and future stuff then we don't get the boxed error - only the exception reported in Caused By.

regards.

On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case. 

Regards,
Dian

在 2019年9月23日,下午11:57,Debasish Ghosh <[hidden email]> 写道:

Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


-- 
Sent from my iPhone


-- 


-- 



-- 


-- 



-- 


-- 



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Biao Liu

> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

So you are submitting job through REST API, not Flink client? Could you explain more about this?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <[hidden email]> wrote:
Hi Dian -

We submit one job through the operator. We just use the following to complete a promise when the job completes ..

      Try {
        createLogic.executeStreamingQueries(ctx.env)
      }.fold(
        th ⇒ completionPromise.tryFailure(th),
        _ ⇒ completionPromise.trySuccess(Dun)
      )


If we totally do away with the promise and future stuff then we don't get the boxed error - only the exception reported in Caused By.

regards.

On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case. 

Regards,
Dian

在 2019年9月23日,下午11:57,Debasish Ghosh <[hidden email]> 写道:

Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


-- 
Sent from my iPhone


-- 


-- 



-- 


-- 



-- 


-- 



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

tison
Hi Biao,

The log below already infers that the job was submitted via REST API and I don't think it matters.

at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)

What I don't understand it that flink DOES catch the exception at the point it is reported thrown...

Best,
tison.


Biao Liu <[hidden email]> 于2019年9月24日周二 上午10:34写道:

> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

So you are submitting job through REST API, not Flink client? Could you explain more about this?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <[hidden email]> wrote:
Hi Dian -

We submit one job through the operator. We just use the following to complete a promise when the job completes ..

      Try {
        createLogic.executeStreamingQueries(ctx.env)
      }.fold(
        th ⇒ completionPromise.tryFailure(th),
        _ ⇒ completionPromise.trySuccess(Dun)
      )


If we totally do away with the promise and future stuff then we don't get the boxed error - only the exception reported in Caused By.

regards.

On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case. 

Regards,
Dian

在 2019年9月23日,下午11:57,Debasish Ghosh <[hidden email]> 写道:

Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


-- 
Sent from my iPhone


-- 


-- 



-- 


-- 



-- 


-- 



--
Reply | Threaded
Open this post in threaded view
|

Re: Recommended approach to debug this

Biao Liu
Hi Zili,

Thanks for pointing that out.
I didn't realize that it's a REST API based case. Debasish's case has been discussed not only in this thread...

It's really hard to analyze the case without the full picture.

I think the reason of why `ProgramAbortException` is not caught is that he did something outside `env.execute`. Like executing this piece of codes inside a Scala future.

I guess the scenario is that he is submitting job through REST API. But in the main method, he wraps `env.execute` with Scala future, not executing it directly.
The reason of env has been set to `StreamPlanEnvironment` is `JarHandlerUtils` retrieves job graph through it.
And the `ProgramAbortException` is not thrown out, because the Scala future tackles this exception.
So retrieving job graph fails due to an unrecognized exception (Boxed Error).

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 10:44, Zili Chen <[hidden email]> wrote:
Hi Biao,

The log below already infers that the job was submitted via REST API and I don't think it matters.

at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)

What I don't understand it that flink DOES catch the exception at the point it is reported thrown...

Best,
tison.


Biao Liu <[hidden email]> 于2019年9月24日周二 上午10:34写道:

> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

So you are submitting job through REST API, not Flink client? Could you explain more about this?

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <[hidden email]> wrote:
Hi Dian -

We submit one job through the operator. We just use the following to complete a promise when the job completes ..

      Try {
        createLogic.executeStreamingQueries(ctx.env)
      }.fold(
        th ⇒ completionPromise.tryFailure(th),
        _ ⇒ completionPromise.trySuccess(Dun)
      )


If we totally do away with the promise and future stuff then we don't get the boxed error - only the exception reported in Caused By.

regards.

On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case. 

Regards,
Dian

在 2019年9月23日,下午11:57,Debasish Ghosh <[hidden email]> 写道:

Hi tison -

Please find my response below in >>.

regards.

On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <[hidden email]> wrote:
Hi Debasish,

The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan
in its catch (Throwable t) branch.

>> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment

It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any
exception thrown in the main method of your code.

Another important problem is how the code is executed, (set context environment should be another flink internal operation)
but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation.

>> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager

However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official
flink release.

>> It is an official Flink release 1.9.0 

A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type,
I think it is not ExecutionGraph in flink...

>> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. 

Best,
tison.


Debasish Ghosh <[hidden email]> 于2019年9月23日周一 下午8:21写道:
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exception. So the real stack trace we have is the one below in Caused By. 

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <[hidden email]> wrote:
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.

Regards,
Dian

在 2019年9月23日,下午6:55,Debasish Ghosh <[hidden email]> 写道:

Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable. 

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using 

import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?

regards.

On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <[hidden email]> wrote:
Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian


在 2019年9月23日,下午6:08,Debasish Ghosh <[hidden email]> 写道:

Hi Tison -

This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }


I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception ..

$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <[hidden email]> wrote:
Hi Debasish,

As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.

Best,
tison.


Austin Cawley-Edwards <[hidden email]> 于2019年9月23日周一 上午5:09写道:
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there. 

Here’s the link:


Best,
Austin

On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.

regards.

On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft. 

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <[hidden email]> wrote:
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.

Regards,
Dian


在 2019年9月21日,上午4:14,Debasish Ghosh <[hidden email]> 写道:

Hi -

When you get an exception stack trace like this ..

Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.


-- 
Sent from my iPhone


-- 


-- 



-- 


-- 



-- 


-- 



--
12