Limit on number of files to read for Dataset

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Limit on number of files to read for Dataset

Darshan Singh
Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

vino yang
Hi Darshan,

In a distributed scenario, there is no limit in theory, but there are still some real-world conditions that will cause some constraints, such as the size of your individual files, the memory size of your TM configuration, and so on. 
In addition, your "single" here is logical or physical, I mean, is it physically multiple parallel source sub task instances?

Thanks, vino.

Darshan Singh <[hidden email]> 于2018年8月14日周二 上午3:01写道:
Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

Jörn Franke
In reply to this post by Darshan Singh
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <[hidden email]> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

Fabian Hueske-2
Hi,

Flink InputFormats generate their InputSplits sequentially on the JobManager.
These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <[hidden email]>:
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <[hidden email]> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

Darshan Singh
Thanks all for your responses. I am now much more clearer on this.

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink InputFormats generate their InputSplits sequentially on the JobManager.
These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <[hidden email]>:
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <[hidden email]> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

Darshan Singh
In reply to this post by Fabian Hueske-2
Thanks for the details. I got it working. I have around 1 directory for each month and I am running for 12-15 month data.So I created a dataset from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than 120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for 6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
    at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown Source)

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink InputFormats generate their InputSplits sequentially on the JobManager.
These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <[hidden email]>:
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <[hidden email]> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Limit on number of files to read for Dataset

Fabian Hueske-2
Hi Darshan,

This looks like a file system configuration issue to me.
Flink supports different file systems for S3 and there are also a few tuning knobs.

Did you have a look at the docs for file system configuration [1]?

Best, Fabian


2018-08-14 20:45 GMT+02:00 Darshan Singh <[hidden email]>:
Thanks for the details. I got it working. I have around 1 directory for each month and I am running for 12-15 month data.So I created a dataset from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than 120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for 6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
    at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown Source)

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink InputFormats generate their InputSplits sequentially on the JobManager.
These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily.
Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality.

I see three potential problems:
1) InputSplit generation might take a long while. The JM is blocked until splits are generated.
2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process.
3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view).

Best, Fabian

2018-08-14 8:19 GMT+02:00 Jörn Franke <[hidden email]>:
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. 
The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/

On 13. Aug 2018, at 21:01, Darshan Singh <[hidden email]> wrote:

Hi Guys,

Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset.

Thanks