Flink snapshotting to S3 - Timeout waiting for connection from pool

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink snapshotting to S3 - Timeout waiting for connection from pool

Shannon Carey
I'm having pretty frequent issues with the exception below. It basically always ends up killing my cluster after forcing a large number of job restarts. I just can't keep Flink up & running.

I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the emrfs-site config fs.s3.maxConnections from the default (50) to 75, after AWS support told me the name of the config option. However, that hasn't fixed the problem. Assuming that increasing the maxConnections again doesn't fix the problem, is there anything else I can do? Is anyone else having this problem? Is it possible that the state backend isn't properly calling close() on its filesystem objects? Or is there a large number of concurrent open filesystem objects for some reason? I am using the default checkpointing settings with one checkpoint at a time, checkpointing every 10 minutes. If I am reading the metrics correctly, the checkpoint duration is between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3. Any help is appreciated.

java.lang.RuntimeException: Could not initialize state backend. 
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
... 13 more
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
... 41 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Ufuk Celebi
Hey Shannon!

Is this always reproducible and how long does it take to reproduce it?

I've not seen this error before but as you say it indicates that some
streams are not closed.

Did the jobs do any restarts before this happened? Flink 1.1.4
contains fixes for more robust releasing of resources in failure
scenarios. Is trying 1.1.4 an option?

– Ufuk

On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:

> I'm having pretty frequent issues with the exception below. It basically
> always ends up killing my cluster after forcing a large number of job
> restarts. I just can't keep Flink up & running.
>
> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
> AWS support told me the name of the config option. However, that hasn't
> fixed the problem. Assuming that increasing the maxConnections again doesn't
> fix the problem, is there anything else I can do? Is anyone else having this
> problem? Is it possible that the state backend isn't properly calling
> close() on its filesystem objects? Or is there a large number of concurrent
> open filesystem objects for some reason? I am using the default
> checkpointing settings with one checkpoint at a time, checkpointing every 10
> minutes. If I am reading the metrics correctly, the checkpoint duration is
> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
> Any help is appreciated.
>
> java.lang.RuntimeException: Could not initialize state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
> at
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
> at
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
> at
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
> at
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
> ... 13 more
> Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
> Source)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
> ... 41 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Chen Qin
We have seen this issue back to Flink 1.0. Our finding back then was traffic congestion to AWS in internal network. Many teams too dependent on S3 and bandwidth is shared, cause traffic congestion from time to time.

Hope it helps!

Thanks
Chen

> On Jan 12, 2017, at 03:30, Ufuk Celebi <[hidden email]> wrote:
>
> Hey Shannon!
>
> Is this always reproducible and how long does it take to reproduce it?
>
> I've not seen this error before but as you say it indicates that some
> streams are not closed.
>
> Did the jobs do any restarts before this happened? Flink 1.1.4
> contains fixes for more robust releasing of resources in failure
> scenarios. Is trying 1.1.4 an option?
>
> – Ufuk
>
>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>>
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>>
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
>> at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
>> ... 13 more
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
>> Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
>> Source)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>> ... 41 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Shannon Carey
In reply to this post by Ufuk Celebi
I can't predict when it will occur, but usually it's after Flink has been running for at least a week.

Yes, I do believe we had several job restarts due to an exception due to a Cassandra node being down for maintenance and therefore a query failing to meet the QUORUM consistency level requested. I'm fixing the retry consistency logic there, but I'm sure we'll run into failing jobs again eventually.

I'm upgrading to 1.1.4 now, hopefully it will help.


-Shannon

On 1/12/17, 5:30 AM, "Ufuk Celebi" <[hidden email]> wrote:

>Hey Shannon!
>
>Is this always reproducible and how long does it take to reproduce it?
>
>I've not seen this error before but as you say it indicates that some
>streams are not closed.
>
>Did the jobs do any restarts before this happened? Flink 1.1.4
>contains fixes for more robust releasing of resources in failure
>scenarios. Is trying 1.1.4 an option?
>
>– Ufuk
>
>On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>>
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>>
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
>> at
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
>> ... 13 more
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
>> Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
>> Source)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>> ... 41 more
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Shannon Carey
In reply to this post by Chen Qin
Good to know someone else has had the same problem... What did you do about it? Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" <[hidden email]> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic congestion to AWS in internal network. Many teams too dependent on S3 and bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Shannon!
>>
>> Is this always reproducible and how long does it take to reproduce it?
>>
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>>
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>>
>> – Ufuk
>>
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>>
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>>
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>>> Unable to execute HTTP request: Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
>>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
>>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
>>> ... 13 more
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
>>> Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
>>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
>>> Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>>> ... 41 more
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Stephan Ewen
Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there is a connection leak somewhere (Flink code or user code or vendor library) and thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that go through Flink's filesystem abstraction. So that should not be an issue there any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey <[hidden email]> wrote:
Good to know someone else has had the same problem... What did you do about it? Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" <[hidden email]> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic congestion to AWS in internal network. Many teams too dependent on S3 and bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Shannon!
>>
>> Is this always reproducible and how long does it take to reproduce it?
>>
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>>
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>>
>> – Ufuk
>>
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>>
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>>
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>>> Unable to execute HTTP request: Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
>>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
>>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
>>> ... 13 more
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
>>> Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
>>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
>>> Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>>> ... 41 more
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Shannon Carey
I haven't seen it yet, I'll let you know if I do.

My last whole-cluster failure seems to have been caused by placing too much load on the cluster. We had a job that got up to 12GB in checkpoint size. Current cluster is 6x c3.2xlarge. The logs show a lot of "java.net.SocketException: Connection reset" when trying to write checkpoints to S3, as well as repeated disconnect/reconnect with Zookeeper "Client session timed out, have not heard from server in 28301ms for sessionid 0x254bb682e214f79, closing socket connection and attempting reconnect", and things like "akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.0.88.37:38768/user/taskmanager#-497097074]] after [10000 ms]". Generally, it seems as if the network got overwhelmed.

-Shannon

From: Stephan Ewen <[hidden email]>
Date: Tuesday, January 24, 2017 at 8:30 AM
To: <[hidden email]>
Subject: Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there is a connection leak somewhere (Flink code or user code or vendor library) and thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that go through Flink's filesystem abstraction. So that should not be an issue there any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey <[hidden email]> wrote:
Good to know someone else has had the same problem... What did you do about it? Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin" <[hidden email]> wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic congestion to AWS in internal network. Many teams too dependent on S3 and bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Shannon!
>>
>> Is this always reproducible and how long does it take to reproduce it?
>>
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>>
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>>
>> – Ufuk
>>
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <[hidden email]> wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>>
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>>
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>>> Unable to execute HTTP request: Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>> at
>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162)
>>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
>>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119)
>>> ... 13 more
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
>>> Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
>>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown
>>> Source)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
>>> ... 41 more
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Chen Qin
In reply to this post by Shannon Carey
We worked around S3 and had a beer with our Hadoop engineers...
Reply | Threaded
Open this post in threaded view
|

Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

Shannon Carey
Haha, I see. Thanks.




On 1/26/17, 1:48 PM, "Chen Qin" <[hidden email]> wrote:

>We worked around S3 and had a beer with our Hadoop engineers...
>
>
>
>--
>View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-snapshotting-to-S3-Timeout-waiting-for-connection-from-pool-tp10994p11330.html
>Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>