Hi Flinkers,
I am totally new to Flink and Scala. I am trying to study Flink in Scala for a project in university and ran into 2 problems, it would be great if you guys can give me any advice. #1 problem is that I want to read CSV files with varied fields (different names and number of fields), for example: file 1: id, name, age file 2: id, name, [unknown1], [unknown2] expected result set: id, name, age, [unknown1], [unknown2] Currently I read each file as Array, then map the array to a common class with Map[header, value] (since I will need to know which value belongs to which header) With this method I ran into #2 problem with output format #2 I would like to store binary info, for example, for class DataSet[MyClass[id: Long, array: Array[String]]] to read them later. I found FileOutputFormat might be the solution, but I can't find any example of how to define one in Scala |
Hi, Flink provides a CsvInputFormat which returns Tuples of the parsed fields. The format can be configured in several ways (which fields to read, line/field delimiters, comment prefixes, ...) The CsvInputFormat expects that a file has a consistent format. You could configure the format for each file format that you need to read and convert their output to your common type using a Map function. If the format of the files is not known ahead, you should implement your own format (probably based on the DelimitedInputFormat). Flink provides a SerializedOutputFormat that writes data in the binary representation that is also used during processing, e.g., for network transfer and disk spilling. The data can be later read using the SerializedInputFormat. Best regards, Fabian 2015-02-03 22:31 GMT+01:00 Vinh June <[hidden email]>: Hi Flinkers, |
Hi! Fabian refers to the "TypeSerializerOutputFormat" [1]. You can save your types in efficient binary representation by calling 'dataset.write(new TypeSerializerOutputFormat<Type>(), "/your/file/path"); ' Greetings, Stephan On Wed, Feb 4, 2015 at 12:07 AM, Fabian Hueske <[hidden email]> wrote:
|
Hello Fabian and Stephan,
Thank you guys for your reply @Fabian: Could you please be kind enough to write a dummy example using SerializedOutputFormat and SerializedInputFormat, I tried with below instruction: dataset.write(new SerializedOutputFormat[MyClass], dataPath) but it doesn't work and throws error because type arguments [...] do not conform to class SerializedOutputFormat's type parameter bounds. I tried with Stephan's advice to use TypeSerielizerOutputFormat, it seems to work but I do not know how to read the output back in @Stephan: output using dataset.write(new TypeSerializerOutputFormat[MyClass], outputDataPath) works for me. but when I tried the same instruction to read back: val readback = env.readFile[MyClass](new TypeSerializerInputFormat[MyClass], dataPath) this gives error of unspecified value parameter x$1 when I tried to add TypeSerializer as IntelliJ suggests: val readback = env.readFile[MyClass](new TypeSerializerInputFormat[MyClass](TypeSerializer[MyClass]), dataPath) it says that TypeSerializer is not a value. I couldn't figure out any solution from the debug log. Is there any working example for using those in Scala ?? |
Hi! I would go with the TypeSerializerInputFormat. Here is a code sample (in Java, Scala should work the same way): ------------------------------------------------------------ DataSet<MyType> dataSet = ...; // write it out dataSet.write(new TypeSerializerOutputFormat<MyType>(), "path"); // read it in DataSet<MyType> read = env.readFile(new TypeSerializerInputFormat<MyType>(dataSet.getType()), "path"); ------------------------------------------------------------ The important thing to notice is that the TypeSerializerInputFormat needs the TypeInformation of the type to read (to know how to deserialize it). The simplest way of making sure that the type information used for reading is the same as that used for writing, is by simply taking the type information from the written data set (by calling getType() on the data set). Greetings, Stephan On Wed, Feb 4, 2015 at 11:18 AM, Vinh June <[hidden email]> wrote: Hello Fabian and Stephan, |
Thanks,
I just tried and it works with scala also. Small notice for anyone who mights interested is that the constructor of TypeSerializerInputFormat needs a TypeSerializer, not a TypeInformation. So this would work in Scala: ---------------------------------------- [SCALA] val readback = env .readFile[MyClass]( new TypeSerializerInputFormat[MyClass](dataSet.getType.createSerializer()), dataPath) ---------------------------------------- If you want to separate write and readFile into different object (as in above code, I used env.readFile ), then in this case, dataSet is not available, we'll need to create serializer from class definition as below ---------------------------------------- [SCALA] val readback = env .readFile[MyClass]( new TypeSerializerInputFormat[MyClass](createTypeInformation[MyClass].createSerializer()), dataPath) ---------------------------------------- Again, thank you @Stephan |
Nice! BTW: The TypeSerializerInputFormat just changed (in the 0.9-SNAPSHOT master) so that it now takes the type information, rather than a type serializer... Stephan On Wed, Feb 4, 2015 at 11:52 AM, Vinh June <[hidden email]> wrote: Thanks, |
Free forum by Nabble | Edit this page |