What is the recommended way of discarding the Column Header(s) from a CSV file, if I am using
environment.readTextFile(....) facility? Obviously, we don't know beforehand, which of the nodes will read the Header(s)? So, we cannot use usual tricks like drop(1)? I don't recall well: has this been discussed and closed earlier in this forum? If so, can someone point that out to me please? -- Nirmalya |
Hi, Nirmalya
I recommend readCsvFile() method rather than readTextFile() to read CSV file. readCsvFile() provides some features for CSV file such as ignoreFirstLine() (what you are looking for), ignoreComments(), and etc. If you have to use readTextFile() method, I think, you can ignore column headers by calling zipWithIndex method and filtering it based on the index. Regards, Chiwan Park > On Apr 27, 2016, at 10:32 AM, nsengupta <[hidden email]> wrote: > > What is the recommended way of discarding the Column Header(s) from a CSV > file, if I am using > > /environment.readTextFile(....) > / > facility? Obviously, we don't know beforehand, which of the nodes will read > the Header(s)? So, we cannot use usual tricks like drop(1)? > > I don't recall well: has this been discussed and closed earlier in this > forum? If so, can someone point that out to me please? > > -- Nirmalya > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hello Chiwan, I was just about to post to declare my ignorance, because I searched again and realized that I failed to spot ReadCsvFile ! :-) You have been faster than me! Yes, I should use ReadCsvFile so that I get all the facilities built in. Many thanks for pointing out. On Wed, Apr 27, 2016 at 7:19 AM, Chiwan Park-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi, Nirmalya Software Technologist
http://www.linkedin.com/in/nirmalyasengupta "If you have built castles in the air, your work need not be lost. That is where they should be. Now put the foundation under them." |
This post was updated on .
Chiwan and other Flinksters,
I am stuck with the following. Somehow, I am an unable to spot the error, if any! Please help. I have this case class: case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String) I intend to read from a CSV file which has a one-line header: BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country I attempt to read the file in this manner: private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = { env.readCsvFile [BuildingInformation] ( inputPath, ignoreFirstLine = true, pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country") ) } Then, I use this function in the driver's main(): val envDefault = ExecutionEnvironment.getExecutionEnvironment val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toList The 'buildings' list is always empty! I fail to figure out, why! I have checked that the path of the CSV file is correct and accessible. Also, I can read the same stuff by following the usual method of reading as a text-line, parsing the commas and creating the POJOs (case-classes). -- Nirmalya |
Hi Nirmalya, I tried to reproduce your problem but was not successful. For me it worked to read a csv file and file in the values in to case classes. Could you maybe compile an example program with sample input to reproduce your problem? Cheers, Till On Wed, Apr 27, 2016 at 5:51 AM, nsengupta <[hidden email]> wrote: Chiwan and other Flinksters, I am stuck with the following. Somehow, I am an unable to spot the error, if any! Please help. I have this case class: case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String) I intend to read from a CSV file which has a one-line header: BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country I attempt to read the file in this manner: private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = { env.readCsvFile [BuildingInformation] ( inputPath, ignoreFirstLine = true, pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country") ) } Then, I use this function in the driver's main(): val envDefault = ExecutionEnvironment.getExecutionEnvironment val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toList The 'buildings' list is always empty! I fail to figure out, why! I have checked that the path of the CSV file is correct and accessible. Also, I can read the same stuff by following the usual method of reading as a text-line, parsing the commas and creating the POJOs (case-classes). -- Nirmalya |
Till,
Thanks for looking into this. I have removed the toList() from the collect() function, to align the code with what I generally do in a Flink application. It throws an Exception, and I can't figure out why. Here's my code (shortened for brevity): case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String) object HVACReadingsAnalysis { def main(args: Array[String]): Unit = { val envDefault = ExecutionEnvironment.getExecutionEnvironment val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv") buildings.print envDefault.execute("HVAC Simulation") } private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = { // [NS]: I can see the lines, read correctly from the CSV file here println("As read from CSV file") println(Source.fromFile(inputPath).getLines.toList.mkString("#\n")) // [NS]: Then, I read the same file using the library function env.readCsvFile [BuildingInformation] ( inputPath, ignoreFirstLine = true, pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country") ) } Relevant portion of the output: As read from CSV file BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country# 1,M1,25,AC1000,USA# 2,M2,27,FN39TG,France# 3,M3,28,JDNS77,Brazil# 4,M4,17,GG1919,Finland# 5,M5,3,ACMAX22,Hong Kong# 6,M6,9,AC1000,Singapore# 7,M7,13,FN39TG,South Africa# 8,M8,25,JDNS77,Australia# 9,M9,11,GG1919,Mexico# 10,M10,23,ACMAX22,China# 11,M11,14,AC1000,Belgium# 12,M12,26,FN39TG,Finland# 13,M13,25,JDNS77,Saudi Arabia# 14,M14,17,GG1919,Germany# 15,M15,19,ACMAX22,Israel# 16,M16,23,AC1000,Turkey# 17,M17,11,FN39TG,Egypt# 18,M18,25,JDNS77,Indonesia# 19,M19,14,GG1919,Canada# 20,M20,19,ACMAX22,Argentina 15:34:18,914 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 15:34:19,104 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster. 15:34:19,912 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started // .. // ... more log statements // .. Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) at main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60) at main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Process finished with exit code 1 |
Hi,
You don’t need to call execute() method after calling print() method. print() method triggers the execution. The exception is raised because you call execute() after print() method. Regards, Chiwan Park > On Apr 27, 2016, at 6:35 PM, nsengupta <[hidden email]> wrote: > > Till, > > Thanks for looking into this. > > I have removed the toList() from the collect() function, to align the code > with what I generally do in a Flink application. It throws an Exception, and > I can't figure out why. > > *Here's my code (shortened for brevity):* > > case class BuildingInformation(buildingID: Int, buildingManager: Int, > buildingAge: Int, productID: String, country: String) > > object HVACReadingsAnalysis { > > def main(args: Array[String]): Unit = { > > val envDefault = ExecutionEnvironment.getExecutionEnvironment > > val buildings = > readBuildingInfo(envDefault,"./SensorFiles/building.csv") > > buildings.print > > envDefault.execute("HVAC Simulation") > } > > private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) > = { > > // [NS]: I can see the lines, read correctly from the CSV file here > println("As read from CSV file") > println(Source.fromFile(inputPath).getLines.toList.mkString("#\n")) > > // [NS]: Then, I read the same file using the library function > env.readCsvFile [BuildingInformation] ( > inputPath, > ignoreFirstLine = true, > pojoFields = > Array("buildingID","buildingManager","buildingAge","productID","country") > ) > } > > > *Relevant portion of the output: > * > As read from CSV file > BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country# > 1,M1,25,AC1000,USA# > 2,M2,27,FN39TG,France# > 3,M3,28,JDNS77,Brazil# > 4,M4,17,GG1919,Finland# > 5,M5,3,ACMAX22,Hong Kong# > 6,M6,9,AC1000,Singapore# > 7,M7,13,FN39TG,South Africa# > 8,M8,25,JDNS77,Australia# > 9,M9,11,GG1919,Mexico# > 10,M10,23,ACMAX22,China# > 11,M11,14,AC1000,Belgium# > 12,M12,26,FN39TG,Finland# > 13,M13,25,JDNS77,Saudi Arabia# > 14,M14,17,GG1919,Germany# > 15,M15,19,ACMAX22,Israel# > 16,M16,23,AC1000,Turkey# > 17,M17,11,FN39TG,Egypt# > 18,M18,25,JDNS77,Indonesia# > 19,M19,14,GG1919,Canada# > 20,M20,19,ACMAX22,Argentina > 15:34:18,914 INFO org.apache.flink.api.java.ExecutionEnvironment > - The job has 0 registered types and 0 default Kryo serializers > 15:34:19,104 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster > - Starting FlinkMiniCluster. > 15:34:19,912 INFO akka.event.slf4j.Slf4jLogger > - Slf4jLogger started > > > // .. > // ... more log statements > // .. > > Exception in thread "main" java.lang.RuntimeException: No new data sinks > have been defined since the last execution. The last execution refers to the > latest call to 'execute()', 'count()', 'collect()', or 'print()'. > at > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979) > at > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) > at > main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60) > at > main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > Process finished with exit code 1 > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hello Chiwan,
Yes, that's an oversight on my part. In my hurry, I didn't even try to explore the source of that Exception. Thanks, again. However, I still don't know why I am not being able to read the CSV file. As the output shows, using standard IO routines, I can read the same file anyway. Could you spot my mistake? -- Nirmalya |
It seems that type of `buildingManager` is not matched to CSV column. In source code, `buildingManager` is defined as `Int`. But in your CSV file, it starts with a character `M`.
I succeeded in running the code with your CSV file after changing the type of `buildingManager` to `String`. Regards, Chiwan Park > On Apr 28, 2016, at 9:51 AM, nsengupta <[hidden email]> wrote: > > Hello Chiwan, > > Yes, that's an oversight on my part. In my hurry, I didn't even try to > explore the source of that /Exception/. Thanks, again. > > However, I still don't know why I am not being able to read the CSV file. As > the output shows, using standard IO routines, I can read the same file > anyway. > > Could you spot my mistake? > > -- Nirmalya > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6519.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hello Chiwan,
Sorry for the late reply. I have been into other things for some time. Yes, you are right. I have been assuming that field to be Integer, wrongly. I will fix it and give it a go again. Many thanks again. -- Nirmalya |
Free forum by Nabble | Edit this page |