CSV input with unknown # of fields and Custom output format

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

CSV input with unknown # of fields and Custom output format

Vinh June
Hi Flinkers,
I am totally new to Flink and Scala. I am trying to study Flink in Scala for a project in university and ran into 2 problems, it would be great if you guys can give me any advice.

#1 problem is that I want to read CSV files with varied fields (different names and number of fields), for example:
file 1: id, name, age
file 2: id, name, [unknown1], [unknown2]
expected result set: id, name, age, [unknown1], [unknown2]

Currently I read each file as Array, then map the array to a common class with Map[header, value] (since I will need to know which value belongs to which header)
With this method I ran into #2 problem with output format

#2 I would like to store binary info, for example, for class DataSet[MyClass[id: Long, array: Array[String]]] to read them later. I found FileOutputFormat might be the solution, but I can't find any example of how to define one in Scala

Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Fabian Hueske-2
Hi,

Flink provides a CsvInputFormat which returns Tuples of the parsed fields. The format can be configured in several ways (which fields to read, line/field delimiters, comment prefixes, ...) The CsvInputFormat expects that a file has a consistent format. You could configure the format for each file format that you need to read and convert their output to your common type using a Map function.
If the format of the files is not known ahead, you should implement your own format (probably based on the DelimitedInputFormat).

Flink provides a SerializedOutputFormat that writes data in the binary representation that is also used during processing, e.g., for network transfer and disk spilling. The data can be later read using the SerializedInputFormat.

Best regards, Fabian 

2015-02-03 22:31 GMT+01:00 Vinh June <[hidden email]>:
Hi Flinkers,
I am totally new to Flink and Scala. I am trying to study Flink in Scala for
a project in university and ran into 2 problems, it would be great if you
guys can give me any advice.

#1 problem is that I want to read CSV files with varied fields (different
names and number of fields), for example:
file 1: id, name, age
file 2: id, name, [unknown1], [unknown2]
expected result set: id, name, age, [unknown1], [unknown2]

Currently I read each file as Array, then map the array to a common class
with Map[header, value] (since I will need to know which value belongs to
which header)
With this method I ran into #2 problem with output format

#2 I would like to store binary info, for example, for class
DataSet[MyClass[id: Long, array: Array[String]]] to read them later. I found
FileOutputFormat might be the solution, but I can't find any example of how
to define one in Scala





--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/CSV-input-with-unknown-of-fields-and-Custom-output-format-tp670.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Stephan Ewen
Hi!

Fabian refers to the "TypeSerializerOutputFormat" [1]. You can save your types in efficient binary representation by calling 'dataset.write(new TypeSerializerOutputFormat<Type>(), "/your/file/path"); '

Greetings,
Stephan




On Wed, Feb 4, 2015 at 12:07 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink provides a CsvInputFormat which returns Tuples of the parsed fields. The format can be configured in several ways (which fields to read, line/field delimiters, comment prefixes, ...) The CsvInputFormat expects that a file has a consistent format. You could configure the format for each file format that you need to read and convert their output to your common type using a Map function.
If the format of the files is not known ahead, you should implement your own format (probably based on the DelimitedInputFormat).

Flink provides a SerializedOutputFormat that writes data in the binary representation that is also used during processing, e.g., for network transfer and disk spilling. The data can be later read using the SerializedInputFormat.

Best regards, Fabian 

2015-02-03 22:31 GMT+01:00 Vinh June <[hidden email]>:
Hi Flinkers,
I am totally new to Flink and Scala. I am trying to study Flink in Scala for
a project in university and ran into 2 problems, it would be great if you
guys can give me any advice.

#1 problem is that I want to read CSV files with varied fields (different
names and number of fields), for example:
file 1: id, name, age
file 2: id, name, [unknown1], [unknown2]
expected result set: id, name, age, [unknown1], [unknown2]

Currently I read each file as Array, then map the array to a common class
with Map[header, value] (since I will need to know which value belongs to
which header)
With this method I ran into #2 problem with output format

#2 I would like to store binary info, for example, for class
DataSet[MyClass[id: Long, array: Array[String]]] to read them later. I found
FileOutputFormat might be the solution, but I can't find any example of how
to define one in Scala





--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/CSV-input-with-unknown-of-fields-and-Custom-output-format-tp670.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Vinh June
Hello Fabian and Stephan,

Thank you guys for your reply

@Fabian: Could you please be kind enough to write a dummy example using SerializedOutputFormat and SerializedInputFormat, I tried with below instruction:

dataset.write(new SerializedOutputFormat[MyClass], dataPath)

but it doesn't work and throws error because type arguments [...] do not conform to class SerializedOutputFormat's type parameter bounds.

I tried with Stephan's advice to use TypeSerielizerOutputFormat, it seems to work but I do not know how to read the output back in

@Stephan: output using dataset.write(new TypeSerializerOutputFormat[MyClass], outputDataPath) works for me. but when I tried the same instruction to read back:

val readback = env.readFile[MyClass](new TypeSerializerInputFormat[MyClass], dataPath)

this gives error of unspecified value parameter x$1

when I tried to add TypeSerializer as IntelliJ suggests:

val readback = env.readFile[MyClass](new TypeSerializerInputFormat[MyClass](TypeSerializer[MyClass]), dataPath)

it says that TypeSerializer is not a value.

I couldn't figure out any solution from the debug log. Is there any working example for using those in Scala ??
Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Stephan Ewen
Hi!

I would go with the TypeSerializerInputFormat.


Here is a code sample (in Java, Scala should work the same way):

------------------------------------------------------------

DataSet<MyType> dataSet = ...;

// write it out
dataSet.write(new TypeSerializerOutputFormat<MyType>(), "path");

// read it in
DataSet<MyType> read = env.readFile(new TypeSerializerInputFormat<MyType>(dataSet.getType()), "path");

------------------------------------------------------------

The important thing to notice is that the TypeSerializerInputFormat needs the TypeInformation of the type to read (to know how to deserialize it).

The simplest way of making sure that the type information used for reading is the same as that used for writing, is by simply taking the type information
from the written data set (by calling getType() on the data set).


Greetings,
Stephan


On Wed, Feb 4, 2015 at 11:18 AM, Vinh June <[hidden email]> wrote:
Hello Fabian and Stephan,

Thank you guys for your reply

@Fabian: Could you please be kind enough to write a dummy example using
SerializedOutputFormat and SerializedInputFormat, I tried with below
instruction:

dataset.write(new SerializedOutputFormat[MyClass], dataPath)

but it doesn't work and throws error because type arguments [...] do not
conform to class SerializedOutputFormat's type parameter bounds.

I tried with Stephan's advice to use TypeSerielizerOutputFormat, it seems to
work but I do not know how to read the output back in

@Stephan: output using dataset.write(new
TypeSerializerOutputFormat[MyClass], outputDataPath) works for me. but when
I tried the same instruction to read back:

val readback = env.readFile[MyClass](new TypeSerializerInputFormat[MyClass],
dataPath)

this gives error of unspecified value parameter x$1

when I tried to add TypeSerializer as IntelliJ suggests:

val readback = env.readFile[MyClass](new
TypeSerializerInputFormat[MyClass](TypeSerializer[MyClass]), dataPath)

it says that TypeSerializer is not a value.

I couldn't figure out any solution from the debug log. Is there any working
example for using those in Scala ??




--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/CSV-input-with-unknown-of-fields-and-Custom-output-format-tp670p673.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Vinh June
Thanks,
I just tried and it works with scala also.

Small notice for anyone who mights interested is that the constructor of TypeSerializerInputFormat needs a TypeSerializer, not a TypeInformation. So this would work in Scala:
----------------------------------------
[SCALA]
val readback = env
  .readFile[MyClass](
    new TypeSerializerInputFormat[MyClass](dataSet.getType.createSerializer()),
    dataPath)
----------------------------------------

If you want to separate write and readFile into different object (as in above code, I used env.readFile ), then in this case, dataSet is not available, we'll need to create serializer from class definition as below
----------------------------------------
[SCALA]
val readback = env
  .readFile[MyClass](
    new TypeSerializerInputFormat[MyClass](createTypeInformation[MyClass].createSerializer()),
    dataPath)
----------------------------------------

Again, thank you @Stephan
Reply | Threaded
Open this post in threaded view
|

Re: CSV input with unknown # of fields and Custom output format

Stephan Ewen
Nice!

BTW: The TypeSerializerInputFormat just changed (in the 0.9-SNAPSHOT master) so that it now takes the type information, rather than a type serializer...

Stephan


On Wed, Feb 4, 2015 at 11:52 AM, Vinh June <[hidden email]> wrote:
Thanks,
I just tried and it works with scala also.

Small notice for anyone who mights interested is that the constructor of
TypeSerializerInputFormat needs a TypeSerializer, not a TypeInformation. So
this would work in Scala:
----------------------------------------
[SCALA]
val readback = env
  .readFile[MyClass](
    new
TypeSerializerInputFormat[MyClass](dataSet.getType.createSerializer()),
    dataPath)
----------------------------------------

If you want to separate write and readFile into different object (as in
above code, I used env.readFile ), then in this case, dataSet is not
available, we'll need to create serializer from class definition as below
----------------------------------------
[SCALA]
val readback = env
  .readFile[MyClass](
    new
TypeSerializerInputFormat[MyClass](createTypeInformation[MyClass].createSerializer()),
    dataPath)
----------------------------------------

Again, thank you @Stephan



--
View this message in context: http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/CSV-input-with-unknown-of-fields-and-Custom-output-format-tp670p675.html
Sent from the Apache Flink (Incubator) User Mailing List archive. mailing list archive at Nabble.com.