http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/read-gz-files-tp760p766.html
> Hey,
>
> are you using Flink 0.8.0 ? I think we've added support for Hadoop input
> formats with scala in 0.8.1 and 0.9 (master).
>
> The following code just printed me the List of all page titles of the
> catalan wikipedia ;)
> (build against master)
>
> defmain(args: Array[String]) {
>
> valenv = ExecutionEnvironment.getExecutionEnvironment
> valjob =newJobConf()
> valhadoopInput =newTextInputFormat()
> FileInputFormat.addInputPath(job,newPath("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
> vallines = env.createHadoopInput(hadoopInput,classOf[LongWritable],classOf[Text], job)
>
> lines.print
>
>
> env.execute("Scala WordCount Example")
> }
>
>
>
>
> On Thu, Feb 19, 2015 at 9:56 PM, Sebastian <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> I tried to follow the example on the web page like this:
>
> ------------------------------__------------------------------__-----------
>
> implicit val env = ExecutionEnvironment.__getExecutionEnvironment
>
> val job = Job.getInstance
>
> val hadoopInput = new HadoopInputFormat[__LongWritable,Text](
> new TextInputFormat, classOf[LongWritable], classOf[Text], job)
>
> FileInputFormat.addInputPath(__job, new
> Path("/home/ssc/pld-index.gz")__)
>
> val lines: DataSet[Tuple2[LongWritable, Text]] =
> env.createInput(hadoopInput)
>
> val numLines = lines.map { _ => Tuple1(1) }
> .sum(0)
>
> numLines.printToErr()
>
> env.execute()
>
> ------------------------------__------------------------------__-----------
>
> Unfortunately, I get the following exception, which I cannot resolve:
>
> Exception in thread "main"
> org.apache.flink.api.common.__functions.__InvalidTypesException:
> Tuple needs to be parameterized by using generics.
> at
> org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfoWithTypeHierarch__y(TypeExtractor.java:258)
> at
> org.apache.flink.api.java.__typeutils.TypeExtractor.__privateCreateTypeInfo(__TypeExtractor.java:201)
> at
> org.apache.flink.api.java.__typeutils.TypeExtractor.__createTypeInfo(TypeExtractor.__java:188)
> at
> io.ssc.trackthetrackers.__analysis.statistics.Playing$__delayedInit$body.apply(__Playing.scala:24)
> at scala.Function0$class.apply$__mcV$sp(Function0.scala:40)
> at
> scala.runtime.__AbstractFunction0.apply$mcV$__sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.__apply(App.scala:71)
> at scala.App$$anonfun$main$1.__apply(App.scala:71)
> at scala.collection.immutable.__List.foreach(List.scala:318)
> at
> scala.collection.generic.__TraversableForwarder$class.__foreach(TraversableForwarder.__scala:32)
> at scala.App$class.main(App.__scala:71)
> at
> io.ssc.trackthetrackers.__analysis.statistics.Playing$.__main(Playing.scala:15)
> at
> io.ssc.trackthetrackers.__analysis.statistics.Playing.__main(Playing.scala)
> at sun.reflect.__NativeMethodAccessorImpl.__invoke0(Native
> Method)
> at
> sun.reflect.__NativeMethodAccessorImpl.__invoke(__NativeMethodAccessorImpl.java:__62)
> at
> sun.reflect.__DelegatingMethodAccessorImpl.__invoke(__DelegatingMethodAccessorImpl.__java:43)
> at java.lang.reflect.Method.__invoke(Method.java:483)
> at
> com.intellij.rt.execution.__application.AppMain.main(__AppMain.java:134)
>
> Any tips on how to proceed?
>
> Best,
> Sebastian
>
>
>
>
>
>
>
> On 19.02.2015 21:36, Robert Metzger wrote:
>
> I just had a look at Hadoop's TextInputFormat.
> In hadoop-common-2.2.0.jar there are the following compression
> codecs
> contained:
>
> org.apache.hadoop.io.compress.__BZip2Codec
> org.apache.hadoop.io.compress.__DefaultCodec
> org.apache.hadoop.io.compress.__DeflateCodec
> org.apache.hadoop.io.compress.__GzipCodec
> org.apache.hadoop.io.compress.__Lz4Codec
> org.apache.hadoop.io.compress.__SnappyCodec
>
> (See also CompressionCodecFactory). So you should be good to go.
>
>
> On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger
> <
[hidden email] <mailto:
[hidden email]>
> <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
>
> Hi,
>
> right now Flink itself has only support for reading ".deflate"
> files. Its basically the same algorithm as gzip but gzip
> files seem
> to have some header which makes the two formats incompatible.
>
> But you can easily use HadoopInputFormats with Flink. I'm
> sure there
> is a Hadoop IF for reading gzip'ed files.
>
>
> Best,
> Robert
>
>
> On Thu, Feb 19, 2015 at 9:25 PM, Sebastian
> <
[hidden email] <mailto:
[hidden email]>
> <mailto:ssc.open@googlemail.__com
> <mailto:
[hidden email]>>> wrote:
>
> Hi,
>
> does flink support reading gzipped files? Haven't found
> any info
> about this on the website.
>
> Best,
> Sebastian
>
>
>
>