Team, One more question to the community regarding hardening Flink Apps. Let me start off by saying we do have known Kafka bottlenecks which we are in the midst of resolving. So during certain times of day, a lot of our Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are some flavor of this: java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time Timeouts are not necessarily good but I am sure we understand this is bound to happen (hopefully lesser). The issue for us however is it almost looks like Flink is stopping and restarting all operators (a lot of other operators including Map, Reduce and Process functions if not all) along with Kafka Producers. We are processing pretty substantial load in Flink and dont really intend to enable Rocks/HDFS checkpointing in some of these Apps - we are ok to sustain some data loss when App crashes completely or something along those lines. However, what we are noticing here is all the data that are in memory for sliding window functions are also lost completely because of this. I would have thought because of the retry settings in Kafka Producer, even those 28 events in queue should have been recovered let alone over a million events in Memory State waiting to be Folded/Reduced for the sliding window. This doesnt feel right :) Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would almost all Job Graph restart on an operator timeout? Do I need to do something simple like disable Operator chaining? We really really are trying to just use Memory and not any other state for these heavy hitting streams. Thanks for your help, Ashish |
Hi Ashish, Originally, Flink always performed full recovery in case of a failure, i.e., it restarted the complete application.2018-01-19 20:59 GMT+01:00 ashish pok <[hidden email]>:
|
Fabian,
Thanks for your feedback - very helpful as usual ! This is sort of becoming a huge problem for us right now because of our Kafka situation. For some reason I missed this detail going through the docs. We are definitely seeing heavy dose of data loss when Kafka timeouts are happening. We actually have 1.4 version - I’d be interested to understand if anything can be done in 1.4 to prevent this scenario. One other thought I had was an ability to invoke “Checkpointing before Restart / Recovery” -> meaning I don’t necessarily need to checkpoint periodically but I do want to make sure on a explicit restart / rescheduling like this, we do have a decent “last known” state. Not sure if this is currently doable. Thanks, Ashish
|
Try setting the Kafka producer config option for number of retries ("retries") to a large number, to avoid the timeout. It defaults to zero. Do note that retries may result reordered records. On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <[hidden email]> wrote:
|
The reorder issue can be resolved by setting MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 if we talking pure kafka producer configs ( and I believe they port over to flink kafka connecter ). This does limit the concurrency ( at the TCP level ) when kafka is back up an issue which is not very limiting once we have understood the batch,size and linger.ms configurations and set them up optimally, of kafka producer. On Thu, Jan 25, 2018 at 11:41 AM, Elias Levy <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |