Re: read .gz files

Posted by Sebastian Schelter on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/read-gz-files-tp760p763.html

I tried to follow the example on the web page like this:

-----------------------------------------------------------------------

implicit val env = ExecutionEnvironment.getExecutionEnvironment

val job = Job.getInstance

val hadoopInput = new HadoopInputFormat[LongWritable,Text](
   new TextInputFormat, classOf[LongWritable], classOf[Text], job)

FileInputFormat.addInputPath(job, new Path("/home/ssc/pld-index.gz"))

val lines: DataSet[Tuple2[LongWritable, Text]] =
     env.createInput(hadoopInput)

val numLines = lines.map { _ => Tuple1(1) }
                       .sum(0)

numLines.printToErr()

env.execute()

-----------------------------------------------------------------------

Unfortunately, I get the following exception, which I cannot resolve:

Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs
to be parameterized by using generics.
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:258)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:201)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:188)
        at
io.ssc.trackthetrackers.analysis.statistics.Playing$delayedInit$body.apply(Playing.scala:24)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
        at
io.ssc.trackthetrackers.analysis.statistics.Playing$.main(Playing.scala:15)
        at io.ssc.trackthetrackers.analysis.statistics.Playing.main(Playing.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

Any tips on how to proceed?

Best,
Sebastian







On 19.02.2015 21:36, Robert Metzger wrote:

> I just had a look at Hadoop's TextInputFormat.
> In hadoop-common-2.2.0.jar there are the following compression codecs
> contained:
>
> org.apache.hadoop.io.compress.BZip2Codec
> org.apache.hadoop.io.compress.DefaultCodec
> org.apache.hadoop.io.compress.DeflateCodec
> org.apache.hadoop.io.compress.GzipCodec
> org.apache.hadoop.io.compress.Lz4Codec
> org.apache.hadoop.io.compress.SnappyCodec
>
> (See also CompressionCodecFactory). So you should be good to go.
>
>
> On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi,
>
>     right now Flink itself has only support for reading ".deflate"
>     files. Its basically the same algorithm as gzip but gzip files seem
>     to have some header which makes the two formats incompatible.
>
>     But you can easily use HadoopInputFormats with Flink. I'm sure there
>     is a Hadoop IF for reading gzip'ed files.
>
>
>     Best,
>     Robert
>
>
>     On Thu, Feb 19, 2015 at 9:25 PM, Sebastian <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi,
>
>         does flink support reading gzipped files? Haven't found any info
>         about this on the website.
>
>         Best,
>         Sebastian
>
>
>