Hi Guys, Is there a limit on number of files flink dataset can read? My question is will there be any sort of issues if I have say millions of files to read to create single dataset. Thanks |
Hi Darshan, In a distributed scenario, there is no limit in theory, but there are still some real-world conditions that will cause some constraints, such as the size of your individual files, the memory size of your TM configuration, and so on. In addition, your "single" here is logical or physical, I mean, is it physically multiple parallel source sub task instances? Thanks, vino. Darshan Singh <[hidden email]> 于2018年8月14日周二 上午3:01写道:
|
In reply to this post by Darshan Singh
It causes more overhead (processes etc) which might make it slower. Furthermore if you have them stored on HDFS then the bottleneck is the namenode which will have to answer millions of requests. The latter point will change in future Hadoop versions with http://ozone.hadoop.apache.org/
|
Hi, Flink InputFormats generate their InputSplits sequentially on the JobManager. These splits are stored in the heap of the JM process and handed out to SourceTasks when they request them lazily. Split assignment is done by a InputSplitAssigner, that can be customized. FileInputFormats typically use a LocatableInputSplitAssigner which tries to assign splits based on locality. I see three potential problems: 1) InputSplit generation might take a long while. The JM is blocked until splits are generated. 2) All InputSplits need to be stored on the JM heap. You might need to assign more memory to the JM process. 3) Split assignment might take a while depending on the complexity of the InputSplitAssigner. You can implement a custom assigner to make this more efficient (from an assignment point of view). Best, Fabian 2018-08-14 8:19 GMT+02:00 Jörn Franke <[hidden email]>:
|
Thanks all for your responses. I am now much more clearer on this. Thanks On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
Thanks for the details. I got it working. I have around 1 directory for each month and I am running for 12-15 month data.So I created a dataset from each month and did a union. However, when I run I get the HTTP timeout issue. I am reading more than 120K files in total in all of months. I am using S3 and emr to do this and flink version is 1.4.2. When I run for 6 months this works fine. Below is part of error Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from pool at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439) at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) at com.sun.proxy.$Proxy28.retrievePair(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36) at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865) Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown Source) Thanks On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi Darshan, This looks like a file system configuration issue to me. Flink supports different file systems for S3 and there are also a few tuning knobs. Did you have a look at the docs for file system configuration [1]? Best, Fabian 2018-08-14 20:45 GMT+02:00 Darshan Singh <[hidden email]>:
|
Free forum by Nabble | Edit this page |