Discarding header from CSV file

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Discarding header from CSV file

nsengupta
What is the recommended way of discarding the Column Header(s) from a CSV file, if I am using

environment.readTextFile(....)

facility? Obviously, we don't know beforehand, which of the nodes will read the Header(s)? So, we cannot use usual tricks like drop(1)?

I don't recall well: has this been discussed and closed earlier in this forum? If so, can someone point that out to me please?

-- Nirmalya
Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

Chiwan Park-2
Hi, Nirmalya

I recommend readCsvFile() method rather than readTextFile() to read CSV file. readCsvFile() provides some features for CSV file such as ignoreFirstLine() (what you are looking for), ignoreComments(), and etc.

If you have to use readTextFile() method, I think, you can ignore column headers by calling zipWithIndex method and filtering it based on the index.

Regards,
Chiwan Park

> On Apr 27, 2016, at 10:32 AM, nsengupta <[hidden email]> wrote:
>
> What is the recommended way of discarding the Column Header(s) from a CSV
> file, if I am using
>
> /environment.readTextFile(....)
> /
> facility? Obviously, we don't know beforehand, which of the nodes will read
> the Header(s)? So, we cannot use usual tricks like drop(1)?
>
> I don't recall well: has this been discussed and closed earlier in this
> forum? If so, can someone point that out to me please?
>
> -- Nirmalya
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

nsengupta
Hello Chiwan,

I was just about to post to declare my ignorance, because I searched again and realized that I failed to spot ReadCsvFile ! :-) You have been faster than me!

Yes, I should use ReadCsvFile so that I get all the facilities built in. 

Many thanks for pointing out.


On Wed, Apr 27, 2016 at 7:19 AM, Chiwan Park-2 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Hi, Nirmalya

I recommend readCsvFile() method rather than readTextFile() to read CSV file. readCsvFile() provides some features for CSV file such as ignoreFirstLine() (what you are looking for), ignoreComments(), and etc.

If you have to use readTextFile() method, I think, you can ignore column headers by calling zipWithIndex method and filtering it based on the index.

Regards,
Chiwan Park

> On Apr 27, 2016, at 10:32 AM, nsengupta <[hidden email]> wrote:
>
> What is the recommended way of discarding the Column Header(s) from a CSV
> file, if I am using
>
> /environment.readTextFile(....)
> /
> facility? Obviously, we don't know beforehand, which of the nodes will read
> the Header(s)? So, we cannot use usual tricks like drop(1)?
>
> I don't recall well: has this been discussed and closed earlier in this
> forum? If so, can someone point that out to me please?
>
> -- Nirmalya
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6475.html
To unsubscribe from Discarding header from CSV file, click here.
NAML



--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is where they should be.
Now put the foundation under them."
Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

nsengupta
This post was updated on .
Chiwan and other Flinksters,

I am stuck with the following. Somehow, I am an unable to spot the error, if any! Please help.

I have this case class:
case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String)

I intend to read from a CSV file which has a one-line header:
BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country

I attempt to read the file in this manner:
private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = {
env.readCsvFile [BuildingInformation] (
      inputPath,
      ignoreFirstLine = true,
      pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country")
    )
}

Then, I use this function in the driver's main():
val envDefault = ExecutionEnvironment.getExecutionEnvironment
val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toList

The 'buildings' list is always empty!

I fail to figure out, why! I have checked that the path of the CSV file is correct and accessible. Also, I can read the same stuff by following the usual method of reading as a text-line, parsing the commas and creating the POJOs (case-classes).


-- Nirmalya

Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

Till Rohrmann
Hi Nirmalya,

I tried to reproduce your problem but was not successful. For me it worked to read a csv file and file in the values in to case classes. Could you maybe compile an example program with sample input to reproduce your problem?

Cheers,
Till

On Wed, Apr 27, 2016 at 5:51 AM, nsengupta <[hidden email]> wrote:
Chiwan and other Flinksters, I am stuck with the following. Somehow, I am an unable to spot the error, if any! Please help. I have this case class: case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String) I intend to read from a CSV file which has a one-line header: BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country I attempt to read the file in this manner: private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = { env.readCsvFile [BuildingInformation] ( inputPath, ignoreFirstLine = true, pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country") ) } Then, I use this function in the driver's main(): val envDefault = ExecutionEnvironment.getExecutionEnvironment val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv").collect().toList The 'buildings' list is always empty! I fail to figure out, why! I have checked that the path of the CSV file is correct and accessible. Also, I can read the same stuff by following the usual method of reading as a text-line, parsing the commas and creating the POJOs (case-classes). -- Nirmalya

Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

nsengupta
Till,

Thanks for looking into this.

I have removed the toList() from the collect() function, to align the code with what I generally do in a Flink application. It throws an Exception, and I can't figure out why.

Here's my code (shortened for brevity):

case class BuildingInformation(buildingID: Int, buildingManager: Int, buildingAge: Int, productID: String, country: String)

object HVACReadingsAnalysis {

  def main(args: Array[String]): Unit = {

    val envDefault = ExecutionEnvironment.getExecutionEnvironment

    val buildings = readBuildingInfo(envDefault,"./SensorFiles/building.csv")

    buildings.print

    envDefault.execute("HVAC Simulation")
  }

  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String) = {

   // [NS]: I can see the lines, read correctly from the CSV file here
    println("As read from CSV file")
    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))

    // [NS]: Then, I read the same file using the library function
   env.readCsvFile [BuildingInformation] (
      inputPath,
      ignoreFirstLine = true,
      pojoFields = Array("buildingID","buildingManager","buildingAge","productID","country")
    )
  }


Relevant portion of the output:

As read from CSV file
BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
1,M1,25,AC1000,USA#
2,M2,27,FN39TG,France#
3,M3,28,JDNS77,Brazil#
4,M4,17,GG1919,Finland#
5,M5,3,ACMAX22,Hong Kong#
6,M6,9,AC1000,Singapore#
7,M7,13,FN39TG,South Africa#
8,M8,25,JDNS77,Australia#
9,M9,11,GG1919,Mexico#
10,M10,23,ACMAX22,China#
11,M11,14,AC1000,Belgium#
12,M12,26,FN39TG,Finland#
13,M13,25,JDNS77,Saudi Arabia#
14,M14,17,GG1919,Germany#
15,M15,19,ACMAX22,Israel#
16,M16,23,AC1000,Turkey#
17,M17,11,FN39TG,Egypt#
18,M18,25,JDNS77,Indonesia#
19,M19,14,GG1919,Canada#
20,M20,19,ACMAX22,Argentina
15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started


// ..
// ... more log statements
// ..

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
        at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
        at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
        at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
        at main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
        at main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Process finished with exit code 1
Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

Chiwan Park-2
Hi,

You don’t need to call execute() method after calling print() method. print() method triggers the execution. The exception is raised because you call execute() after print() method.

Regards,
Chiwan Park

> On Apr 27, 2016, at 6:35 PM, nsengupta <[hidden email]> wrote:
>
> Till,
>
> Thanks for looking into this.
>
> I have removed the toList() from the collect() function, to align the code
> with what I generally do in a Flink application. It throws an Exception, and
> I can't figure out why.
>
> *Here's my code (shortened for brevity):*
>
> case class BuildingInformation(buildingID: Int, buildingManager: Int,
> buildingAge: Int, productID: String, country: String)
>
> object HVACReadingsAnalysis {
>
>  def main(args: Array[String]): Unit = {
>
>    val envDefault = ExecutionEnvironment.getExecutionEnvironment
>
>    val buildings =
> readBuildingInfo(envDefault,"./SensorFiles/building.csv")
>
>    buildings.print
>
>    envDefault.execute("HVAC Simulation")
>  }
>
>  private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String)
> = {
>
>   // [NS]: I can see the lines, read correctly from the CSV file here
>    println("As read from CSV file")
>    println(Source.fromFile(inputPath).getLines.toList.mkString("#\n"))
>
>    // [NS]: Then, I read the same file using the library function
>   env.readCsvFile [BuildingInformation] (
>      inputPath,
>      ignoreFirstLine = true,
>      pojoFields =
> Array("buildingID","buildingManager","buildingAge","productID","country")
>    )
>  }
>
>
> *Relevant portion of the output:
> *
> As read from CSV file
> BuildingID,BuildingMgr,BuildingAge,HVACproduct,Country#
> 1,M1,25,AC1000,USA#
> 2,M2,27,FN39TG,France#
> 3,M3,28,JDNS77,Brazil#
> 4,M4,17,GG1919,Finland#
> 5,M5,3,ACMAX22,Hong Kong#
> 6,M6,9,AC1000,Singapore#
> 7,M7,13,FN39TG,South Africa#
> 8,M8,25,JDNS77,Australia#
> 9,M9,11,GG1919,Mexico#
> 10,M10,23,ACMAX22,China#
> 11,M11,14,AC1000,Belgium#
> 12,M12,26,FN39TG,Finland#
> 13,M13,25,JDNS77,Saudi Arabia#
> 14,M14,17,GG1919,Germany#
> 15,M15,19,ACMAX22,Israel#
> 16,M16,23,AC1000,Turkey#
> 17,M17,11,FN39TG,Egypt#
> 18,M18,25,JDNS77,Indonesia#
> 19,M19,14,GG1919,Canada#
> 20,M20,19,ACMAX22,Argentina
> 15:34:18,914 INFO  org.apache.flink.api.java.ExecutionEnvironment              
> - The job has 0 registered types and 0 default Kryo serializers
> 15:34:19,104 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster        
> - Starting FlinkMiniCluster.
> 15:34:19,912 INFO  akka.event.slf4j.Slf4jLogger                                
> - Slf4jLogger started
>
>
> // ..
> // ... more log statements
> // ..
>
> Exception in thread "main" java.lang.RuntimeException: No new data sinks
> have been defined since the last execution. The last execution refers to the
> latest call to 'execute()', 'count()', 'collect()', or 'print()'.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:979)
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:84)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis$.main(HVACReadingsAnalysis.scala:60)
> at
> main.scala.hortonworks.tutorial.HVACReadingsAnalysis.main(HVACReadingsAnalysis.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> Process finished with exit code 1
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6494.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

nsengupta
Hello Chiwan,

Yes, that's an oversight on my part. In my hurry, I didn't even try to explore the source of that Exception. Thanks, again.

However, I still don't know why I am not being able to read the CSV file. As the output shows, using standard IO routines, I can read the same file anyway.

Could you spot my mistake?

-- Nirmalya
Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

Chiwan Park-2
It seems that type of `buildingManager` is not matched to CSV column. In source code, `buildingManager` is defined as `Int`. But in your CSV file, it starts with a character `M`.

I succeeded in running the code with your CSV file after changing the type of `buildingManager` to `String`.

Regards,
Chiwan Park

> On Apr 28, 2016, at 9:51 AM, nsengupta <[hidden email]> wrote:
>
> Hello Chiwan,
>
> Yes, that's an oversight on my part. In my hurry, I didn't even try to
> explore the source of that /Exception/. Thanks, again.
>
> However, I still don't know why I am not being able to read the CSV file. As
> the output shows, using standard IO routines, I can read the same file
> anyway.
>
> Could you spot my mistake?
>
> -- Nirmalya
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Discarding-header-from-CSV-file-tp6474p6519.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Discarding header from CSV file

nsengupta
Hello Chiwan,

Sorry for the late reply. I have been into other things for some time.

Yes, you are right. I have been assuming that field to be Integer, wrongly.

I will fix it and give it a go again.

Many thanks again.

-- Nirmalya