def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Scala WordCount Example")
}
I tried to follow the example on the web page like this:
-----------------------------------------------------------------------
implicit val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance
val hadoopInput = new HadoopInputFormat[LongWritable,Text](
new TextInputFormat, classOf[LongWritable], classOf[Text], job)
FileInputFormat.addInputPath(job, new Path("/home/ssc/pld-index.gz"))
val lines: DataSet[Tuple2[LongWritable, Text]] =
env.createInput(hadoopInput)
val numLines = lines.map { _ => Tuple1(1) }
.sum(0)
numLines.printToErr()
env.execute()
-----------------------------------------------------------------------
Unfortunately, I get the following exception, which I cannot resolve:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics.
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:258)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:201)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:188)
at io.ssc.trackthetrackers.analysis.statistics.Playing$delayedInit$body.apply(Playing.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at io.ssc.trackthetrackers.analysis.statistics.Playing$.main(Playing.scala:15)
at io.ssc.trackthetrackers.analysis.statistics.Playing.main(Playing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Any tips on how to proceed?
Best,
Sebastian
On 19.02.2015 21:36, Robert Metzger wrote:
I just had a look at Hadoop's TextInputFormat.
In hadoop-common-2.2.0.jar there are the following compression codecs
contained:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
(See also CompressionCodecFactory). So you should be good to go.
On Thu, Feb 19, 2015 at 9:31 PM, Robert Metzger <[hidden email]
<mailto:[hidden email]>> wrote:
Hi,
right now Flink itself has only support for reading ".deflate"
files. Its basically the same algorithm as gzip but gzip files seem
to have some header which makes the two formats incompatible.
But you can easily use HadoopInputFormats with Flink. I'm sure there
is a Hadoop IF for reading gzip'ed files.
Best,
Robert
On Thu, Feb 19, 2015 at 9:25 PM, Sebastian <[hidden email]
<mailto:[hidden email]>> wrote:
Hi,
does flink support reading gzipped files? Haven't found any info
about this on the website.
Best,
Sebastian
Free forum by Nabble | Edit this page |