Read all json files from a hdfs partition folder

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Read all json files from a hdfs partition folder

Rakesh Kumar
Hi,

I wanted to read all json files from hdfs with partition folder.

public static void main(String[] args) {


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//path

//hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23


DataStream<String> df = env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product");

try {

df.print();

env.execute("dfg");

} catch (Exception e) {

e.printStackTrace();

}


}

Reply | Threaded
Open this post in threaded view
|

Re: Read all json files from a hdfs partition folder

Andrey Zagrebin
Hi,

If the question is how to read all files from hdfs directory,
in general, each file is potentially a different DataSet (not DataStream). 
It needs to be decided how to combine/join them in Flink pipeline.

If the files are small enough, you could list them as string paths and use env.fromCollection to start the pipeline.
Next just manually load file into memory for each path in map operation and transform file contents into records for the next stage.

Best,
Andrey

On 12 Dec 2018, at 15:02, Rakesh Kumar <[hidden email]> wrote:

Hi,

I wanted to read all json files from hdfs with partition folder.

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//path
//hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23

DataStream<String> df = env.readTextFile("<a href="hdfs://localhost:8020/data/ingestion/ingestion.raw.product" class="">hdfs://localhost:8020/data/ingestion/ingestion.raw.product");
try {
df.print();
env.execute("dfg");
} catch (Exception e) {
e.printStackTrace();
}

}

Reply | Threaded
Open this post in threaded view
|

Re: Read all json files from a hdfs partition folder

Andrey Zagrebin
Actually, does it not work if you just provide directory in env.readTextFile as in your code example or what is the problem?

On 12 Dec 2018, at 17:24, Andrey Zagrebin <[hidden email]> wrote:

Hi,

If the question is how to read all files from hdfs directory,
in general, each file is potentially a different DataSet (not DataStream). 
It needs to be decided how to combine/join them in Flink pipeline.

If the files are small enough, you could list them as string paths and use env.fromCollection to start the pipeline.
Next just manually load file into memory for each path in map operation and transform file contents into records for the next stage.

Best,
Andrey

On 12 Dec 2018, at 15:02, Rakesh Kumar <[hidden email]> wrote:

Hi,

I wanted to read all json files from hdfs with partition folder.

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//path
//hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23

DataStream<String> df = env.readTextFile("<a href="hdfs://localhost:8020/data/ingestion/ingestion.raw.product" class="">hdfs://localhost:8020/data/ingestion/ingestion.raw.product");
try {
df.print();
env.execute("dfg");
} catch (Exception e) {
e.printStackTrace();
}

}


Reply | Threaded
Open this post in threaded view
|

Re: Read all json files from a hdfs partition folder

Andrey Zagrebin
Hi Rakesh,

So the problem is that you want your Flink job to monitor ' /data/ingestion/ingestion-raw-product’ path for new files inside and process them when they appear, right?

Can you try env.readFile but with watchType = FileProcessingMode.PROCESS_CONTINUOUSLY?
You can see an example in how env.readTextFile(String filePath) and env.readTextFile(String filePath, String charsetName) are implemented based on env.readFile but with FileProcessingMode.PROCESS_ONCE which processes only the files which are currently in the path.

Best,
Andrey

On 13 Dec 2018, at 07:22, Rakesh Kumar <[hidden email]> wrote:

problem i am facing is that i wanted to read json files present in hdfs partition folder.

suppose hdfs partition folder is /data/ingestion/ingestion-raw-product/2018/12/05/23. so if wanted to read json files from hdfs i have read it from this path /data/ingestion/ingestion-raw-product because folder present inside  /data/ingestion/ingestion-raw-product/ may changes if some new  data came. so, for this i have written flink program to read all json files from hdfs.

Example:  path is /data/ingestion/ingestion-raw-product/2018/12/05/23 and /data/ingestion/ingestion-raw-product/2018/12/12/23 
so for reading all the json files from this path present in hdfs i have used /data/ingestion/ingestion-raw-product/*/*/*/,  but i am not able to read it.

So, can you suggest me solution for this.

Thanks,
Rakesh

On Wed, Dec 12, 2018 at 10:21 PM Andrey Zagrebin <[hidden email]> wrote:
Actually, does it not work if you just provide directory in env.readTextFile as in your code example or what is the problem?

On 12 Dec 2018, at 17:24, Andrey Zagrebin <[hidden email]> wrote:

Hi,

If the question is how to read all files from hdfs directory,
in general, each file is potentially a different DataSet (not DataStream). 
It needs to be decided how to combine/join them in Flink pipeline.

If the files are small enough, you could list them as string paths and use env.fromCollection to start the pipeline.
Next just manually load file into memory for each path in map operation and transform file contents into records for the next stage.

Best,
Andrey

On 12 Dec 2018, at 15:02, Rakesh Kumar <[hidden email]> wrote:

Hi,

I wanted to read all json files from hdfs with partition folder.

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//path
//hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23

DataStream<String> df = env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product");
try {
df.print();
env.execute("dfg");
} catch (Exception e) {
e.printStackTrace();
}

}