Re: Discarding header from CSV file

Posted by Chiwan Park-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Discarding-header-from-CSV-file-tp6474p6495.html

Hi,

You don’t need to call execute() method after calling print() method. print() method triggers the execution. The exception is raised because you call execute() after print() method.

Regards,
Chiwan Park

> On Apr 27, 2016, at 6:35 PM, nsengupta <[hidden email]> wrote:
>
> Till,
>
> Thanks for looking into this.
>
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
>
> *Here's my code (shortened for brevity):*
>
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
>
> object HVACReadingsAnalysis {
>
>  def main(args: Array[String]): Unit = {
>
>    val envDefault = ExecutionEnvironment.getExecutionEnvironment
>
>    val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
>
>    buildings.print
>
>    envDefault.execute("HVAC Simulation")
>  }
>
>  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
>
>   // [NS]: I can see the lines, read correctly from the CSV file here
>    println("As read from CSV file")
>    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
>
>    // [NS]: Then, I read the same file using the library function
>   env.readCsvFile [BuildingInformation] (
>      inputPath,
>      ignoreFirstLine = true,
>      pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
>    )
>  }
>
>
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment              
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster        
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                                
> - Slf4jLogger started
>
>
> // ..
> // ... more log statements
> // ..
>
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> Process finished with exit code 1
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.