Re: Recommended approach to debug this
Posted by
Dian Fu on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Recommended-approach-to-debug-this-tp30101p30139.html
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios. Could you paste the full exception stack if it exists? It's difficult to figure out what's wrong with the current stack trace.
Regards,
Dian
On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
[hidden email]> wrote:
ah .. Ok .. I get the Throwable part. I am using
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?
regards.
Hi Debasish,
As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.
Regards,
Dian
Hi Tison -
This is the code that builds the computation graph.
readStream reads from Kafka and
writeStream writes to Kafka.
override def buildExecutionGraph = {
val rides: DataStream[TaxiRide] =
readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")
val fares: DataStream[TaxiFare] =
readStream(inTaxiFare)
.keyBy("rideId")
val processed: DataStream[TaxiRideFare] =
rides
.connect(fares)
.flatMap(new EnrichmentFunction)
writeStream(out, processed)
}
$ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)
What am I missing here ?
regards.
Hi Debasish,
As mentioned by Dian, it is an internal exception that should be always caught by
Flink internally. I would suggest you share the job(abstractly). Generally it is because
you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
Have you reached out to the FlinkK8sOperator team on Slack? They’re usually pretty active on there.
Here’s the link:
Best,
Austin
On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
[hidden email]> wrote:
The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink Operator. Hence it's difficult to debug in the traditional sense of the term. And all I get is the exception that I reported ..
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
I am thinking that this exception must be coming because of some other exceptions, which are not reported BTW. I expected a Caused By portion in the stack trace. Any clue as to which area I should look into to debug this.
regards.
On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
[hidden email]> wrote:
Thanks for the pointer .. I will try debugging. I am getting this exception running my application on Kubernetes using the Flink operator from Lyft.
regards.
This exception is used internally to get the plan of a job before submitting it for execution. It's thrown with special purpose and will be caught internally in [1] and will not be thrown to end users usually.
You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should be.
Regards,
Dian
Hi -
When you get an exception stack trace like this ..
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
what is the recommended approach of debugging ? I mean what kind of errors can potentially lead to such a stacktrace ? In my case it starts from env.execute(..) but does not give any information as to what can go wrong.
Any help will be appreciated.
--
Sent from my iPhone
--
--
--
--