Kafka Consumer fetch-size/rate and Producer queue timeout

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Consumer fetch-size/rate and Producer queue timeout

Ashish Pokharel
All,

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

Fabian Hueske-2
Hi Ashish,

Gordon (in CC) might be able to help you.

Cheers, Fabian

2017-11-05 16:24 GMT+01:00 Ashish Pokharel <[hidden email]>:
All,

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

Ashish Pokharel
Thanks Fabian.

I am seeing thia consistently and can definitely use some help. I have plenty of graphana views I can share if that helps :)


On Tue, Nov 7, 2017 at 3:54 AM, Fabian Hueske
Hi Ashish,

Gordon (in CC) might be able to help you.

Cheers, Fabian

2017-11-05 16:24 GMT+01:00 Ashish Pokharel <[hidden email]>:
All,

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

Tzu-Li (Gordon) Tai
In reply to this post by Ashish Pokharel
Hi Ashish,

From your description I do not yet have much of an idea of what may be happening.
However, some of your observations seems reasonable. I’ll go through them one by one:

I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached.

If the client is experiencing trouble in writing outstanding records to Kafka, and the timeout is increased, then I think increased back pressure is indeed the expected behavior.

I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates.

Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should be a natural consequence of backpressure in the job.
The fetch loop in the consumer will be blocked temporarily when backpressure is propagated from downstream operators, resulting in longer fetch intervals and larger batches on each fetch (given that events rate are still constant).
Therefore, I think the root cause is still along the lines of the producer side.

Would you happen to have any logs that maybe shows any useful information on the producer side?
I think we might have a better chance of finding out what is going on by digging there.
Also, which Flink version & Kafka version are you using?

Cheers,
Gordon

On 5 November 2017 at 11:24:49 PM, Ashish Pokharel ([hidden email]) wrote:

All,

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout.

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.

Thanks, Ashish
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

Ashish Pokharel
Hi Grodon,

Thanks for your responses. It definitely makes sense. 

I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help.
 
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
                at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
                at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
                at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
                at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
                at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
                at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
                at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
                ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 18 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append

Also for reference here is my ProducerConfig from logs:

INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values:
                acks = 1
                batch.size = 4096
                block.on.buffer.full = false
                bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx]
                buffer.memory = 33554432
                client.id = 
                compression.type = none
                connections.max.idle.ms = 540000
                interceptor.classes = null
                key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
                linger.ms = 500
                max.block.ms = 60000
                max.in.flight.requests.per.connection = 5
                max.request.size = 1048576
                metadata.fetch.timeout.ms = 25000
                metadata.max.age.ms = 300000
                metric.reporters = []
                metrics.num.samples = 2
                metrics.sample.window.ms = 30000
                partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
                receive.buffer.bytes = 32768
                reconnect.backoff.ms = 50
                request.timeout.ms = 30000
                retries = 5
                retry.backoff.ms = 100
                sasl.jaas.config = null
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.mechanism = GSSAPI
                security.protocol = PLAINTEXT
                send.buffer.bytes = 131072
                ssl.cipher.suites = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.endpoint.identification.algorithm = null
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLS
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.location = null
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                timeout.ms = 30000
                value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Thanks, Ashish

On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi Ashish,

From your description I do not yet have much of an idea of what may be happening.
However, some of your observations seems reasonable. I’ll go through them one by one:

I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached.

If the client is experiencing trouble in writing outstanding records to Kafka, and the timeout is increased, then I think increased back pressure is indeed the expected behavior.

I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates.

Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should be a natural consequence of backpressure in the job.
The fetch loop in the consumer will be blocked temporarily when backpressure is propagated from downstream operators, resulting in longer fetch intervals and larger batches on each fetch (given that events rate are still constant).
Therefore, I think the root cause is still along the lines of the producer side.

Would you happen to have any logs that maybe shows any useful information on the producer side?
I think we might have a better chance of finding out what is going on by digging there.
Also, which Flink version & Kafka version are you using?

Cheers,
Gordon

On 5 November 2017 at 11:24:49 PM, Ashish Pokharel ([hidden email]) wrote:

All, 

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. 

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.  

Thanks, Ashish

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

Ashish Pokharel
Hi Gordon,

Any further thoughts on this?

I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the process of upgrading Kafka but won’t be in Prod for at least couple of months.

Thanks, Ashish

On Nov 8, 2017, at 9:35 PM, Ashish Pokharel <[hidden email]> wrote:

Hi Grodon,

Thanks for your responses. It definitely makes sense. 

I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help.
 
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator}
                at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
                at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35)
                at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44)
                at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597)
                at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552)
                at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253)
                at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217)
                ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
                at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
                at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 18 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
                ... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append

Also for reference here is my ProducerConfig from logs:

INFO  org.apache.kafka.clients.producer.ProducerConfig              - ProducerConfig values:
                acks = 1
                batch.size = 4096
                block.on.buffer.full = false
                bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx]
                buffer.memory = 33554432
                client.id = 
                compression.type = none
                connections.max.idle.ms = 540000
                interceptor.classes = null
                key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
                linger.ms = 500
                max.block.ms = 60000
                max.in.flight.requests.per.connection = 5
                max.request.size = 1048576
                metadata.fetch.timeout.ms = 25000
                metadata.max.age.ms = 300000
                metric.reporters = []
                metrics.num.samples = 2
                metrics.sample.window.ms = 30000
                partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
                receive.buffer.bytes = 32768
                reconnect.backoff.ms = 50
                request.timeout.ms = 30000
                retries = 5
                retry.backoff.ms = 100
                sasl.jaas.config = null
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.mechanism = GSSAPI
                security.protocol = PLAINTEXT
                send.buffer.bytes = 131072
                ssl.cipher.suites = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.endpoint.identification.algorithm = null
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLS
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.location = null
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                timeout.ms = 30000
                value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Thanks, Ashish

On Nov 8, 2017, at 5:09 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi Ashish,

From your description I do not yet have much of an idea of what may be happening.
However, some of your observations seems reasonable. I’ll go through them one by one:

I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached.

If the client is experiencing trouble in writing outstanding records to Kafka, and the timeout is increased, then I think increased back pressure is indeed the expected behavior.

I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates.

Drops on fetch-rate and growth on fetch-size in the Flink Kafka consumer should be a natural consequence of backpressure in the job.
The fetch loop in the consumer will be blocked temporarily when backpressure is propagated from downstream operators, resulting in longer fetch intervals and larger batches on each fetch (given that events rate are still constant).
Therefore, I think the root cause is still along the lines of the producer side.

Would you happen to have any logs that maybe shows any useful information on the producer side?
I think we might have a better chance of finding out what is going on by digging there.
Also, which Flink version & Kafka version are you using?

Cheers,
Gordon

On 5 November 2017 at 11:24:49 PM, Ashish Pokharel ([hidden email]) wrote:

All, 

I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. 

It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense.  

Thanks, Ashish