Weird behaviour after change sources in a job.

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird behaviour after change sources in a job.

Juan Gentile

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan

Reply | Threaded
Open this post in threaded view
|

Re: Weird behaviour after change sources in a job.

Dawid Wysakowicz-2

Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions

from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose

information about offsets in Kafka.


I also cc @Gordon, who might want to add something to this.


On 12/09/18 18:03, Juan Gentile wrote:

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Weird behaviour after change sources in a job.

Oleksandr Nitavskyi

Hello Dawid,

 

Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator.

 

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one.

 

Thank you

Oleksandr

 

From: Dawid Wysakowicz <[hidden email]>
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: R&D/Product Engineering/PRIME/Delight <[hidden email]>, <[hidden email]>
Subject: Re: Weird behaviour after change sources in a job.

 

Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions

from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose

information about offsets in Kafka.

 

I also cc @Gordon, who might want to add something to this.

 

On 12/09/18 18:03, Juan Gentile wrote:

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan



Reply | Threaded
Open this post in threaded view
|

Re: Weird behaviour after change sources in a job.

Dawid Wysakowicz-2

Hi Oleksandr,

The mapping of state to operator is done based on operator id, not on its name. That's why changing source's name might not be enough.

That actually might be a valuable addition to check if the restored partitions still match with the provided topic/topic pattern. Would you like to open jira ticket for it?

Best,

Dawid


On 13/09/18 11:06, Oleksandr Nitavskyi wrote:

Hello Dawid,

 

Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator.

 

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one.

 

Thank you

Oleksandr

 

From: Dawid Wysakowicz [hidden email]
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile [hidden email], [hidden email] [hidden email]
Cc: R&D/Product Engineering/PRIME/Delight [hidden email], [hidden email]
Subject: Re: Weird behaviour after change sources in a job.

 

Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions

from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose

information about offsets in Kafka.

 

I also cc @Gordon, who might want to add something to this.

 

On 12/09/18 18:03, Juan Gentile wrote:

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan





signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Weird behaviour after change sources in a job.

Fabian Hueske-2
Hi,

The problem is that Flink SQL does not expose the UIDs of the generated operators.
We've met that issue before, but it is still not fully clear what would be the best way to this accessible.

Best, Fabian

2018-09-13 5:15 GMT-04:00 Dawid Wysakowicz <[hidden email]>:

Hi Oleksandr,

The mapping of state to operator is done based on operator id, not on its name. That's why changing source's name might not be enough.

That actually might be a valuable addition to check if the restored partitions still match with the provided topic/topic pattern. Would you like to open jira ticket for it?

Best,

Dawid


On 13/09/18 11:06, Oleksandr Nitavskyi wrote:

Hello Dawid,

 

Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator.

 

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one.

 

Thank you

Oleksandr

 

From: Dawid Wysakowicz [hidden email]
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile [hidden email], [hidden email] [hidden email]
Cc: R&D/Product Engineering/PRIME/Delight [hidden email], [hidden email]
Subject: Re: Weird behaviour after change sources in a job.

 

Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions

from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose

information about offsets in Kafka.

 

I also cc @Gordon, who might want to add something to this.

 

On 12/09/18 18:03, Juan Gentile wrote:

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan





Reply | Threaded
Open this post in threaded view
|

Re: Weird behaviour after change sources in a job.

Oleksandr Nitavskyi
In reply to this post by Dawid Wysakowicz-2

Great !

 

So I have created a ticket: https://issues.apache.org/jira/browse/FLINK-10342 with a test which reproduces the issue: https://github.com/apache/flink/pull/6691/files

If it seems reasonable I can create a fix for this.

 

Regards

Oleksandr

 

From: Dawid Wysakowicz <[hidden email]>
Date: Thursday, 13 September 2018 at 11:15
To: Oleksandr Nitavskyi <[hidden email]>
Cc: R&D/Product Engineering/PRIME/Delight <[hidden email]>, "[hidden email]" <[hidden email]>, Juan Gentile <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Weird behaviour after change sources in a job.

 

Hi Oleksandr,

The mapping of state to operator is done based on operator id, not on its name. That's why changing source's name might not be enough.

That actually might be a valuable addition to check if the restored partitions still match with the provided topic/topic pattern. Would you like to open jira ticket for it?

Best,

Dawid

 

On 13/09/18 11:06, Oleksandr Nitavskyi wrote:

Hello Dawid,

 

Thank you for the answer. In our case we did change the name of the Kafka source so we expected it shouldn’t restore state for a given Kafka source operator.

 

Anyway shouldn’t FlinkKafkaConsumerBase have a safeguard which do not allow restoring of the KafkaTopicPartitions from the topics which are different from the currently consumed one.

 

Thank you

Oleksandr

 

From: Dawid Wysakowicz [hidden email]
Date: Thursday, 13 September 2018 at 09:59
To: Juan Gentile [hidden email], [hidden email] [hidden email]
Cc: R&D/Product Engineering/PRIME/Delight [hidden email], [hidden email]
Subject: Re: Weird behaviour after change sources in a job.

 

Hi Juan,

I think this is somehow expected behaviour. Flink, in order to provide proper processing semantics keeps track of partitions offsets internally, and checkpoints those offsets. FlinkKafkaConsumer supports

also new partitions discovery. Having in mind both of those features, if you restart your job with savepoint/checkpoint but with changed topic, it will restore old partitions with offsets from checkpoint, and will discover partitions

from the new topic. This is why it consumes from both old and new topic. If you defined your source manually (you were not using Kafka010TableSource) what you can do is set new uid for the source and enable allowNonRestoredState. This way you will keep state for all other operators, but you will lose

information about offsets in Kafka.

 

I also cc @Gordon, who might want to add something to this.

 

On 12/09/18 18:03, Juan Gentile wrote:

Hello!

 

We have found a weird issue while replacing the source in one of our Flink SQL Jobs.

 

We have a job which was reading from a Kafka topic (with externalize checkpoints) and we needed to change the topic while keeping the same logic for the job/SQL.

After we restarted the job, instead of consuming from the new Kafka topic, it consumed from both! Duplicating the input of our job.

We were able to reproduce the issue but we don’t understand if this is a bug or expected behavior and in this case we should have restarted from a clean state.

We are using Flink 1.4 at the moment and Kafka 0.10.2.1

 

Thank you,

Juan