Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

John Smith
Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.

Is there a specific ulimit that we should set for flink tasks nodes?

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
... 14 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Kostas Kloudas-5
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:

>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

John Smith
Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Piotr Nowojski-3
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

John Smith
I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <[hidden email]> wrote:
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Piotr Nowojski-3
But it could be Kafka’s client issue on the Flink side (as the stack trace is suggesting). You can just try to increase limit of opened files for Flink, or try to identify who is opening all of those files and limit it somehow - if it’s Kafka client indeed, maybe it can be configured to use less connections (that’s why I suggested to search for this), or just decreasing parallelism of the job.

Piotrek 

On 19 Feb 2020, at 22:34, John Smith <[hidden email]> wrote:

I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <[hidden email]> wrote:
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more


Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

John Smith
Ok I have 9 jobs running over 3 nodes. Most jobs are set to 1 parallelism, worst case 2. So let's assume maximum parallelism would be 18.

I will try increase the ulimit and hopefully, we wont see it...

On Thu, 20 Feb 2020 at 04:56, Piotr Nowojski <[hidden email]> wrote:
But it could be Kafka’s client issue on the Flink side (as the stack trace is suggesting). You can just try to increase limit of opened files for Flink, or try to identify who is opening all of those files and limit it somehow - if it’s Kafka client indeed, maybe it can be configured to use less connections (that’s why I suggested to search for this), or just decreasing parallelism of the job.

Piotrek 

On 19 Feb 2020, at 22:34, John Smith <[hidden email]> wrote:

I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <[hidden email]> wrote:
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more


Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

John Smith

On Thu, 20 Feb 2020 at 13:58, John Smith <[hidden email]> wrote:
Ok I have 9 jobs running over 3 nodes. Most jobs are set to 1 parallelism, worst case 2. So let's assume maximum parallelism would be 18.

I will try increase the ulimit and hopefully, we wont see it...

On Thu, 20 Feb 2020 at 04:56, Piotr Nowojski <[hidden email]> wrote:
But it could be Kafka’s client issue on the Flink side (as the stack trace is suggesting). You can just try to increase limit of opened files for Flink, or try to identify who is opening all of those files and limit it somehow - if it’s Kafka client indeed, maybe it can be configured to use less connections (that’s why I suggested to search for this), or just decreasing parallelism of the job.

Piotrek 

On 19 Feb 2020, at 22:34, John Smith <[hidden email]> wrote:

I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <[hidden email]> wrote:
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more


Reply | Threaded
Open this post in threaded view
|

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Piotr Nowojski-3
Thanks for reporting back. 

Piotrek

On 20 Feb 2020, at 21:29, John Smith <[hidden email]> wrote:


On Thu, 20 Feb 2020 at 13:58, John Smith <[hidden email]> wrote:
Ok I have 9 jobs running over 3 nodes. Most jobs are set to 1 parallelism, worst case 2. So let's assume maximum parallelism would be 18.

I will try increase the ulimit and hopefully, we wont see it...

On Thu, 20 Feb 2020 at 04:56, Piotr Nowojski <[hidden email]> wrote:
But it could be Kafka’s client issue on the Flink side (as the stack trace is suggesting). You can just try to increase limit of opened files for Flink, or try to identify who is opening all of those files and limit it somehow - if it’s Kafka client indeed, maybe it can be configured to use less connections (that’s why I suggested to search for this), or just decreasing parallelism of the job.

Piotrek 

On 19 Feb 2020, at 22:34, John Smith <[hidden email]> wrote:

I think so also. But I was wondering if this was Consumer or actual Kafka Broker. But this error displayed on the flink task node where the task was running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski <[hidden email]> wrote:
Hey, sorry but I know very little about the KafkaConsumer. I hope that someone else might know more.

However, did you try to google this issue? It doesn’t sound like Flink specific problem, but like a general Kafka issue. Also a solution might be just as simple as bumping the limit of opened files on the unix system (ulimit command if I remember correctly?)

Piotrek

On 14 Feb 2020, at 23:35, John Smith <[hidden email]> wrote:

Hi Piotr, any thoughts on this?

On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, <[hidden email]> wrote:
Hi John,

As you suggested, I would also lean towards increasing the number of
allowed open handles, but
for recommendation on best practices, I am cc'ing Piotr who may be
more familiar with the Kafka consumer.

Cheers,
Kostas

On Tue, Feb 11, 2020 at 9:43 PM John Smith <[hidden email]> wrote:
>
> Just wondering is this on the client side in the flink Job? I rebooted the task and the job deployed correctly on another node.
>
> Is there a specific ulimit that we should set for flink tasks nodes?
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
> ... 11 more
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
> ... 14 more