Hi Guys,
The Flink version in use is 1.9.0.
The job is writing final results to hdfs using SequenceFile format. The output code list as belows:
HadoopOutputFormat<Text, DoubleWritable> hadoopOF = new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
SequenceFileOutputFormat.setOutputPath(job, new Path(params.get("output")));
finalPageRanks.map((MapFunction<Tuple2<String, Double>, Tuple2<Text, DoubleWritable>>) in -> {
Tuple2<Text, DoubleWritable> out = new Tuple2<>();
out.f0 = new Text(in.f0);
out.f1 = new DoubleWritable(in.f1);
return out;
}).returns(TypeInformation.of(new TypeHint<Tuple2<Text, DoubleWritable>>() {}))
.output(hadoopOF);
When job is in sink operator and got following exceptions:
It seems the initialization for type of key and value of output format is wrong, but I don’t know where to set it.
Thanks for you help!
Qi