Re: read .gz files

Posted by Sebastian Schelter on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/read-gz-files-tp760p766.html

Upgrading to 0.8.1 helped, thx!

On 19.02.2015 22:08, Robert Metzger wrote:

> Hey,
>
> are you using Flink 0.8.0 ? I think we've added support for Hadoop input
> formats with scala in 0.8.1 and 0.9 (master).
>
> The following code just printed me the List of all page titles of the
> catalan wikipedia ;)
> (build against master)
>
> defmain(args: Array[String]) {
>
>    valenv = ExecutionEnvironment.getExecutionEnvironment
>    valjob =newJobConf()
>    valhadoopInput =newTextInputFormat()
>    FileInputFormat.addInputPath(job,newPath("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
>    vallines = env.createHadoopInput(hadoopInput,classOf[LongWritable],classOf[Text], job)
>
>    lines.print
>
>
>    env.execute("Scala WordCount Example")
> }
>
>
>
>
> On Thu, Feb 19, 2015 at 9:56 PM, Sebastian <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I tried to follow the example on the web page like this:
>
>     ------------------------------__------------------------------__-----------
>
>     implicit val env = ExecutionEnvironment.__getExecutionEnvironment
>
>     val job = Job.getInstance
>
>     val hadoopInput = new HadoopInputFormat[__LongWritable,Text](
>        new TextInputFormat, classOf[LongWritable], classOf[Text], job)
>
>     FileInputFormat.addInputPath(__job, new
>     Path("/home/ssc/pld-index.gz")__)
>
>     val lines: DataSet[Tuple2[LongWritable, Text]] =
>          env.createInput(hadoopInput)
>
>     val numLines = lines.map { _ => Tuple1(1) }
>                            .sum(0)
>
>     numLines.printToErr()
>
>     env.execute()
>
>     ------------------------------__------------------------------__-----------
>
>     Unfortunately, I get the following exception, which I cannot resolve:
>
>     Exception in thread "main"
>     org.apache.flink.api.common.__functions.__InvalidTypesException:
>     Tuple needs to be parameterized by using generics.
>              at
>     org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfoWithTypeHierarch__y(TypeExtractor.java:258)
>              at
>     org.apache.flink.api.java.__typeutils.TypeExtractor.__privateCreateTypeInfo(__TypeExtractor.java:201)
>              at
>     org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfo(TypeExtractor.__java:188)
>              at
>     io.ssc.trackthetrackers.__analysis.statistics.Playing$__delayedInit$body.apply(__Playing.scala:24)
>              at scala.Function0$class.apply$__mcV$sp(Function0.scala:40)
>              at
>     scala.runtime.__AbstractFunction0.apply$mcV$__sp(AbstractFunction0.scala:12)
>              at scala.App$$anonfun$main$1.__apply(App.scala:71)
>              at scala.App$$anonfun$main$1.__apply(App.scala:71)
>              at scala.collection.immutable.__List.foreach(List.scala:318)
>              at
>     scala.collection.generic.__TraversableForwarder$class.__foreach(TraversableForwarder.__scala:32)
>              at scala.App$class.main(App.__scala:71)
>              at
>     io.ssc.trackthetrackers.__analysis.statistics.Playing$.__main(Playing.scala:15)
>              at
>     io.ssc.trackthetrackers.__analysis.statistics.Playing.__main(Playing.scala)
>              at sun.reflect.__NativeMethodAccessorImpl.__invoke0(Native
>     Method)
>              at
>     sun.reflect.__NativeMethodAccessorImpl.__invoke(__NativeMethodAccessorImpl.java:__62)
>              at
>     sun.reflect.__DelegatingMethodAccessorImpl.__invoke(__DelegatingMethodAccessorImpl.__java:43)
>              at java.lang.reflect.Method.__invoke(Method.java:483)
>              at
>     com.intellij.rt.execution.__application.AppMain.main(__AppMain.java:134)
>
>     Any tips on how to proceed?
>
>     Best,
>     Sebastian
>
>
>
>
>
>
>
>     On 19.02.2015 21:36, Robert Metzger wrote:
>
>         I just had a look at Hadoop's TextInputFormat.
>         In hadoop-common-2.2.0.jar there are the following compression
>         codecs
>         contained:
>
>         org.apache.hadoop.io.compress.__BZip2Codec
>         org.apache.hadoop.io.compress.__DefaultCodec
>         org.apache.hadoop.io.compress.__DeflateCodec
>         org.apache.hadoop.io.compress.__GzipCodec
>         org.apache.hadoop.io.compress.__Lz4Codec
>         org.apache.hadoop.io.compress.__SnappyCodec
>
>         (See also CompressionCodecFactory). So you should be good to go.
>
>
>         On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger
>         <[hidden email] <mailto:[hidden email]>
>         <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>
>              Hi,
>
>              right now Flink itself has only support for reading ".deflate"
>              files. Its basically the same algorithm as gzip but gzip
>         files seem
>              to have some header which makes the two formats incompatible.
>
>              But you can easily use HadoopInputFormats with Flink. I'm
>         sure there
>              is a Hadoop IF for reading gzip'ed files.
>
>
>              Best,
>              Robert
>
>
>              On Thu, Feb 19, 2015 at 9:25 PM, Sebastian
>         <[hidden email] <mailto:[hidden email]>
>              <mailto:ssc.open@googlemail.__com
>         <mailto:[hidden email]>>> wrote:
>
>                  Hi,
>
>                  does flink support reading gzipped files? Haven't found
>         any info
>                  about this on the website.
>
>                  Best,
>                  Sebastian
>
>
>
>