Hi guys,
I want to know what is the default behavior of Kafka source when a kafka cluster goes down during streaming. Will the job status go to failing or is the exception caught and there is a back off before the source tries to poll for more events ? Best, Nick. |
Hi,
Flink doesn't do any special failure-handling or retry logic, so it’s up to how the KafkaConsumer is configured via properties. In general Flink doesn’t try to be smart: when something fails an exception fill bubble up that will fail this execution of the job. If checkpoints are enabled this will trigger a restore, this is controlled by the restart strategy. If that eventually gives up the job fill go to “FAILED” and stop. This is the relevant section of the docs: https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html Best, Aljoscha On 15.07.20 17:42, Nick Bendtner wrote: > Hi guys, > I want to know what is the default behavior of Kafka source when a kafka > cluster goes down during streaming. Will the job status go to failing or is > the exception caught and there is a back off before the source tries to > poll for more events ? > > > Best, > Nick. > |
Hi, I don't observe this behaviour though, we use flink 1.7.2 . I stopped kafka and zookeeper on all broker nodes. On the flink side, I see the messages in the log ( data is obfuscated) . There are no error logs. The kafka consumer properties are 1. "bootstrap.servers" 2. "zookeeper.connect 3. "auto.offset.reset" 4. "group.id"
5."security.protocol" The flink consumer starts consuming data as soon as the kafka comes back up. So I want to know in what scenario/kafka consumer config will the job go to failed state after a finite number of restart attempts from checkpoint. TM log. 2020-08-04 19:50:55,539 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-5, groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker may not be available. 2020-08-04 19:50:55,540 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-4, groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1002 (yyyrspapd037.xxx.com/ss.mm.120.125:9093) could not be established. Broker may not be available. 2020-08-04 19:50:55,791 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-4, groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1004 (yyyrspapd035.xxx.com/ss.mm.120.123:9093) could not be established. Broker may not be available. 2020-08-04 19:50:55,791 WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-6, groupId=flink-AIP-QA-Audit-Consumer] Connection to node 1003 (yyyrspapd036.xxx.com/ss.mm.120.124:9093) could not be established. Broker may not be available. Best, Nick On Mon, Jul 20, 2020 at 10:27 AM Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi Nick, What Aljoscha was trying to say is that Flink is not trying to do any magic. If `KafkaConsumer` - which is being used under the hood of `FlinkKafkaConsumer` connector - throws an exception, this exception bubbles up causing the job to failover. If the failure is handled by the `KafkaConsumer` silently, that's what's happening. As we can in the TM log that you attached, the latter seems to be happening - note that the warning is logged by "org.apache.kafka.clients.NetworkClient" package, so that's not the code we (Flink developers) control. If you want to change this behaviour, unless someone here on this mailing list just happens to know the answer, the better place to ask such a question on the Kafka mailing list. Maybe there is some way to configure this. And sorry I don't know much about neither the KafkaConsumer nor the KafkaBrokers configuration :( Piotrek wt., 4 sie 2020 o 22:04 Nick Bendtner <[hidden email]> napisał(a):
|
+user group. On Wed, Aug 5, 2020 at 12:57 PM Nick Bendtner <[hidden email]> wrote:
|
Hi Nick, Could you elaborate more, what event and how would you like Flink to handle? Is there some kind of Kafka's API that can be used to listen to such kind of events? Becket, do you maybe know something about this? As a side note Nick, can not you configure some timeouts [1] in the KafkaConsumer? Like `request.timeout.ms` or `consumer.timeout.ms`? But as I wrote before, that would be more a question to Kafka guys. Piotrek śr., 5 sie 2020 o 19:58 Nick Bendtner <[hidden email]> napisał(a):
|
Hi Piotr, Sorry for the late reply. So the poll does not throw an exception when a broker goes down. In spring they solve it by generating an event [1] whenever this happens and you can intercept this event, consumer.timeout.ms helps to some extent does help but if the source topic does not receive any messages for the specified value then it still throws an exception. Best, Nick. On Wed, Aug 5, 2020 at 1:30 PM Piotr Nowojski <[hidden email]> wrote:
|
Hey, But do you know what API is Kafka providing that Spring is using to provide this feature? Piotrek czw., 13 sie 2020 o 17:15 Nick Bendtner <[hidden email]> napisał(a):
|
Hey Nick and Piotr, The KafkaConsumer in Apache Kafka itself does not throw any exception if the broker is down. There isn't any API in KafkaConsumer telling you that the brokers are not reachable. Instead, the consumer just keeps retrying to fetch the records. It is designed this way so that when there is a Kafka failure in an organization, people won't have to restart all the downstream applications after Kafka is up again. The Spring Kafka consumer is a community project which wraps the Java KafkaConsumer from Apache Kafka. The Spring Kafka consumer emits a special event if no message is received from the consumer.poll() call for some time. As Nick mentioned, it does not necessarily mean that the broker is down. It simply means that there isn't any message consumed from Kafka for some time. Nick, can you elaborate a little bit on why you would like to have an exception thrown in your Flink app when Kafka is down, rather than let it run until Kafka is up again? Thanks, JIangjie (Becket) Qin On Fri, Aug 14, 2020 at 4:28 PM Piotr Nowojski <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |