Hi,
I wanted to read all json files from hdfs with partition folder. public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //path //hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23 DataStream<String> df = env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product"); try { df.print(); env.execute("dfg"); } catch (Exception e) { e.printStackTrace(); } } |
Hi,
If the question is how to read all files from hdfs directory, in general, each file is potentially a different DataSet (not DataStream). It needs to be decided how to combine/join them in Flink pipeline. If the files are small enough, you could list them as string paths and use env.fromCollection to start the pipeline. Next just manually load file into memory for each path in map operation and transform file contents into records for the next stage. Best, Andrey
|
Actually, does it not work if you just provide directory in env.readTextFile as in your code example or what is the problem?
|
Hi Rakesh,
So the problem is that you want your Flink job to monitor ' /data/ingestion/ingestion-raw-product’ path for new files inside and process them when they appear, right? Can you try env.readFile but with watchType = FileProcessingMode.PROCESS_CONTINUOUSLY? You can see an example in how env.readTextFile(String filePath) and env.readTextFile(String filePath, String charsetName) are implemented based on env.readFile but with FileProcessingMode.PROCESS_ONCE which processes only the files which are currently in the path. Best, Andrey
|
Free forum by Nabble | Edit this page |