Getting executionplan in the local mode inside IDE

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting executionplan in the local mode inside IDE

madhu phatak
Hi,
I am trying to get execution plan for wordcount using below code in local mode inside IntelliJ IDEA. I am using flink 0.10.0.

val env = ExecutionEnvironment.getExecutionEnvironment

val data = List("hi","how are you","hi")

val dataSet = env.fromCollection(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val grouped = mappedWords.groupBy(0)

val sum = grouped.sum(1)

sum.print()

println(env.getExecutionPlan())

The program computes sum correctly, but fails with following exception for last line

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925)
at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95)
at org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635)
at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30)


I tried placing the getExecutionPlan in different places. But I get same error. Is there any other way to get the execution plan in local mode?

--
Regards,
Madhukara Phatak
http://datamantra.io/
Reply | Threaded
Open this post in threaded view
|

Re: Getting executionplan in the local mode inside IDE

Fabian Hueske-2
Hi,

you can only get the execution plan for programs that have a data sink and haven't been executed. In your code print() defines the data sink, however it also eagerly executes a program. After execution the program is "removed" from the execution environment. Therefore, Flink complains that no sink has been defined.

You can print the execution plan if you use a data sink that does not eagerly execute. For example, you can replace the sum.print() statement with sum.output(new DiscardingOutputFormat()) or sum.writeAsText("file://some/path").

Cheers, Fabian



2016-01-01 10:21 GMT+01:00 madhu phatak <[hidden email]>:
Hi,
I am trying to get execution plan for wordcount using below code in local mode inside IntelliJ IDEA. I am using flink 0.10.0.

val env = ExecutionEnvironment.getExecutionEnvironment

val data = List("hi","how are you","hi")

val dataSet = env.fromCollection(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val grouped = mappedWords.groupBy(0)

val sum = grouped.sum(1)

sum.print()

println(env.getExecutionPlan())

The program computes sum correctly, but fails with following exception for last line

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925)
at org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95)
at org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635)
at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30)


I tried placing the getExecutionPlan in different places. But I get same error. Is there any other way to get the execution plan in local mode?

--
Regards,
Madhukara Phatak
http://datamantra.io/