Posted by
Sebastian Schelter on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/read-gz-files-tp760p763.html
I tried to follow the example on the web page like this:
-----------------------------------------------------------------------
implicit val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance
val hadoopInput = new HadoopInputFormat[LongWritable,Text](
new TextInputFormat, classOf[LongWritable], classOf[Text], job)
FileInputFormat.addInputPath(job, new Path("/home/ssc/pld-index.gz"))
val lines: DataSet[Tuple2[LongWritable, Text]] =
env.createInput(hadoopInput)
val numLines = lines.map { _ => Tuple1(1) }
.sum(0)
numLines.printToErr()
env.execute()
-----------------------------------------------------------------------
Unfortunately, I get the following exception, which I cannot resolve:
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs
to be parameterized by using generics.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:258)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:201)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:188)
at
io.ssc.trackthetrackers.analysis.statistics.Playing$delayedInit$body.apply(Playing.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at
io.ssc.trackthetrackers.analysis.statistics.Playing$.main(Playing.scala:15)
at io.ssc.trackthetrackers.analysis.statistics.Playing.main(Playing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Any tips on how to proceed?
Best,
Sebastian
On 19.02.2015 21:36, Robert Metzger wrote:
> I just had a look at Hadoop's TextInputFormat.
> In hadoop-common-2.2.0.jar there are the following compression codecs
> contained:
>
> org.apache.hadoop.io.compress.BZip2Codec
> org.apache.hadoop.io.compress.DefaultCodec
> org.apache.hadoop.io.compress.DeflateCodec
> org.apache.hadoop.io.compress.GzipCodec
> org.apache.hadoop.io.compress.Lz4Codec
> org.apache.hadoop.io.compress.SnappyCodec
>
> (See also CompressionCodecFactory). So you should be good to go.
>
>
> On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi,
>
> right now Flink itself has only support for reading ".deflate"
> files. Its basically the same algorithm as gzip but gzip files seem
> to have some header which makes the two formats incompatible.
>
> But you can easily use HadoopInputFormats with Flink. I'm sure there
> is a Hadoop IF for reading gzip'ed files.
>
>
> Best,
> Robert
>
>
> On Thu, Feb 19, 2015 at 9:25 PM, Sebastian <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi,
>
> does flink support reading gzipped files? Haven't found any info
> about this on the website.
>
> Best,
> Sebastian
>
>
>