http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Discarding-header-from-CSV-file-tp6474p6495.html
You don’t need to call execute() method after calling print() method. print() method triggers the execution. The exception is raised because you call execute() after print() method.
> On Apr 27, 2016, at 6:35 PM, nsengupta <
[hidden email]> wrote:
>
> Till,
>
> Thanks for looking into this.
>
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
>
> *Here's my code (shortened for brevity):*
>
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
>
> object HVACReadingsAnalysis {
>
> def main(args: Array[String]): Unit = {
>
> val envDefault = ExecutionEnvironment.getExecutionEnvironment
>
> val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
>
> buildings.print
>
> envDefault.execute("HVAC Simulation")
> }
>
> private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
>
> // [NS]: I can see the lines, read correctly from the CSV file here
> println("As read from CSV file")
> println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
>
> // [NS]: Then, I read the same file using the library function
> env.readCsvFile [BuildingInformation] (
> inputPath,
> ignoreFirstLine = true,
> pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
> )
> }
>
>
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO org.apache.flink.api.java.ExecutionEnvironment
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO akka.event.slf4j.Slf4jLogger
> - Slf4jLogger started
>
>
> // ..
> // ... more log statements
> // ..
>
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> Process finished with exit code 1
>
>
>
>
> --
> View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.