Flink - Hadoop Connectivity - Unable to read file

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink - Hadoop Connectivity - Unable to read file

Samik Mukherjee
Hi All,

I am trying to get some file from HDFS which is locally installed. But I am not able to. I tried with both these ways. But all the time the program is ending with "Process finished with exit code 239." Any help will be helpful-

public class Processor {

  public static void main(String[] args) throws Exception {
       
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
//env.setParallelism(1);

       
Job job = Job.getInstance();
       
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
       
TextInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/newfolder/testdata1"));
       
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
       
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
 
        words.print();
       
env.execute("Processor");
   
}


   
public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
       
@Override
       
public void flatMap(Tuple2<LongWritable, Text> value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) throws Exception {
           
// normalize and split the line
           
String line = value.f1.toString();
           
String[] tokens = line.toLowerCase().split("\\W+");

           
// emit the pairs
           
for (String token : tokens) {
               
if (token.length() > 0) {
                    out.collect(
new Tuple2<String, Integer>(token, 1));
               
}
            }
        }
    }
}







  
public static void main(String[] args) throws Exception {
       
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
//env.setParallelism(1);

       
DataSet<Tuple2<LongWritable, Text>> input;

       
input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
               
LongWritable.class, Text.class, "hdfs://localhost:9000/newfolder/testdata1"));

       
DataSet<String> stringInput = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
           
@Override
           
public String map(Tuple2<LongWritable, Text> longWritableTextTuple2) throws Exception {
               
return longWritableTextTuple2.f1.toString();
           
}
        })
;

       
stringInput.print();

       
env.execute("Processor");
   
}