All,
I am starting to notice a strange behavior in a particular streaming app. I initially thought it was a Producer issue as I was seeing timeout exceptions (records expiring in queue. I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. With lower timeouts, app would actually raise exception and recover faster. I can tell it is not related to connectivity as other apps are running just fine around the same time frame connected to same brokers (we have at least 10 streaming apps connected to same list of brokers) from the same data nodes. We have enabled Graphite Reporter in all of our applications. After deep diving into some of consumer and producer stats, I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. Eventually, I noticed connection resets start to occur and connection counts go up momentarily. After which, things get back to normal. Data producer rates remain constant around that timeframe - we have Logstash producer sending data over. We checked both Logstash and Kafka metrics and they seem to be showing same pattern (sort of sin wave) throughout. It seems to point to Kafka issue (perhaps some tuning between Flink App and Kafka) but wanted to check with the experts before I start knocking down Kafka Admin’s doors. Are there anything else I can look into. There are quite a few default stats in Graphite but those were the ones that made most sense. Thanks, Ashish |
Hi Ashish, Gordon (in CC) might be able to help you. Cheers, Fabian 2017-11-05 16:24 GMT+01:00 Ashish Pokharel <[hidden email]>: All, |
Thanks Fabian.
I am seeing thia consistently and can definitely use some help. I have plenty of graphana views I can share if that helps :)
|
In reply to this post by Ashish Pokharel
Hi Ashish, From your description I do not yet have much of an idea of what may be happening. However, some of your observations seems reasonable. I’ll go through them one by one: I did try to modify request.timeout.ms, linger.ms etc to help with the issue if it were caused by a sudden burst of data or something along those lines. However, what it caused the app to increase back pressure and made the slower and slower until that timeout is reached. If the client is experiencing trouble in writing outstanding records to Kafka, and the timeout is increased, then I think increased back pressure is indeed the expected behavior. I noticed that consumer fetch-rate drops tremendously while fetch-size grows exponentially BEFORE the producer actually start to show higher response-time and lower rates. The fetch loop in the consumer will be blocked temporarily when backpressure is propagated from downstream operators, resulting in longer fetch intervals and larger batches on each fetch (given that events rate are still constant). Therefore, I think the root cause is still along the lines of the producer side. Would you happen to have any logs that maybe shows any useful information on the producer side? I think we might have a better chance of finding out what is going on by digging there. Also, which Flink version & Kafka version are you using? Cheers, Gordon
On 5 November 2017 at 11:24:49 PM, Ashish Pokharel ([hidden email]) wrote:
|
Hi Grodon,
Thanks for your responses. It definitely makes sense. I could pull this stack from the logs, entire log itself is pretty big - let me know if some samples before/after this may help. TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:35) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:552) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:253) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:217) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 18 more Caused by: java.lang.Exception: Failed to send data to Kafka: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) ... 24 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for prod.app.stats.preproc-1: 33473 ms has passed since last append Also for reference here is my ProducerConfig from logs: INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 4096 block.on.buffer.full = false bootstrap.servers =[xxxxxxx:xxx,xxxxxx:xxx] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 25000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 5 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer Thanks, Ashish
|
Hi Gordon,
Any further thoughts on this? I forgot to mention I am using Flink 1.3.2 and our Kafka is 0.10. We are in the process of upgrading Kafka but won’t be in Prod for at least couple of months. Thanks, Ashish
|
Free forum by Nabble | Edit this page |