Flink Dataset job submission very slow

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Dataset job submission very slow

ysn2233
I have many lzo files on HDFS in such path format: /logs/{id}/{date}/xxx[1-100].lzo

/logs/a/ds=2018-01-01/xxx1.lzo
/logs/b/ds=2018-01-01/xxx1.lzo
...
/logs/z/ds=2018-01-02/xxx1.lzo
...
/logs/z/ds=2020-05-01/xxx100.lzo

I'am using Flink Dataset to read those files by a range of {date} and apply some transformation. Since Flink official does not provide lzo inputformat so I use HadoopInputFormat to implement this. I currently cannot find a good way to give one fileglob path to include all files I need so I have to do it in a loop.

for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
    FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
    for (FileStatus fileStatus: fileStatuses) {
        String path = fileStatus.getPath().toString();
        lzoFiles.add(path)
    }
}       
// ...
// I have to initialize the source like that
Job job = new Job(conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, String.join(",", lzoFiles));
lzoSets = env.createInput(HadoopInputs.createHadoopInput(
    new LzoTextInputFormat(),
    LongWritable.class,
    Text.class,
    job)).name("lzo source").map(x -> x.f1.toString());

However when I submit this job to Flink. If I have a relatively long date range files to read, the job submission take so much time(10mins, even 20mins and more, I have already increase akka.timeout and web.timeout) to be prepared which I cannot accept this. It seems Flink take so much time to optimize the execution plan. Is there and good approach to make my program prepared quickly?
Shengnan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Dataset job submission very slow

Arvid Heise-3
Do you have any logs that could help us identify the issue? How many files is a long date range?

In general, you could try out the same program with the DataStream API (use StreamExecutionEnvironment#readFile [1] with PROCESS_ONCE to get a behavior equivalent to batch). DataStreams are only slightly optimized. Since DataSet API will be eventually deprecated, that is also the recommended way if you don't need feature specific to DataSet API.


On Tue, May 19, 2020 at 7:48 AM ysnakie <[hidden email]> wrote:
I have many lzo files on HDFS in such path format: /logs/{id}/{date}/xxx[1-100].lzo

/logs/a/ds=2018-01-01/xxx1.lzo
/logs/b/ds=2018-01-01/xxx1.lzo
...
/logs/z/ds=2018-01-02/xxx1.lzo
...
/logs/z/ds=2020-05-01/xxx100.lzo

I'am using Flink Dataset to read those files by a range of {date} and apply some transformation. Since Flink official does not provide lzo inputformat so I use HadoopInputFormat to implement this. I currently cannot find a good way to give one fileglob path to include all files I need so I have to do it in a loop.

for (date=startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
    FileStatus[] fileStatuses = fs.globStatus(new org.apache.hadoop.fs.Path("/logs/*" , "ds=" + date.format(DateTimeFormatter.ISO_LOCAL_DATE) + "/*.lzo"));
    for (FileStatus fileStatus: fileStatuses) {
        String path = fileStatus.getPath().toString();
        lzoFiles.add(path)
    }
}       
// ...
// I have to initialize the source like that
Job job = new Job(conf);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, String.join(",", lzoFiles));
lzoSets = env.createInput(HadoopInputs.createHadoopInput(
    new LzoTextInputFormat(),
    LongWritable.class,
    Text.class,
    job)).name("lzo source").map(x -> x.f1.toString());

However when I submit this job to Flink. If I have a relatively long date range files to read, the job submission take so much time(10mins, even 20mins and more, I have already increase akka.timeout and web.timeout) to be prepared which I cannot accept this. It seems Flink take so much time to optimize the execution plan. Is there and good approach to make my program prepared quickly?
Shengnan


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng