Does FlinkKafkaConsumer010 care about consumer group?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Does FlinkKafkaConsumer010 care about consumer group?

Moiz Jinia
Below is a plan for downtime-free upgrade of a Flink job. The downstream consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group G1 (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group G2 (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that during the window when both A and A' are running, all messages should go to both jobs. (And of course I want that job A' should start consuming from the offsets in the savepoint and not the earliest).


Reply | Threaded
Open this post in threaded view
|

Re: Does FlinkKafkaConsumer010 care about consumer group?

Tzu-Li (Gordon) Tai
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled completely (both when checkpointing is enabled or disabled). See [1] on details about that.
2. When starting fresh (not starting from some savepoint), if you choose to use GROUP_OFFSETS as the start position, then the consumer group would also be used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected behavior for the Flink Kafka Consumer.

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

On 19 July 2017 at 3:23:01 PM, Moiz Jinia ([hidden email]) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream
consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G1* (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G2* (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that
during the window when both A and A' are running, all messages should go to
both jobs. (And of course I want that job A' should start consuming from the
offsets in the savepoint and not the earliest).






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Does FlinkKafkaConsumer010 care about consumer group?

Moiz Jinia
Does this mean I can use the same consumer group G1 for the newer version A'? And inspite of same consumer group, A' will receive messages from all partitions when its started from savepoint?

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka source in the job?

Thanks,
Moiz

On Wed, Jul 19, 2017 at 1:06 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled completely (both when checkpointing is enabled or disabled). See [1] on details about that.
2. When starting fresh (not starting from some savepoint), if you choose to use GROUP_OFFSETS as the start position, then the consumer group would also be used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected behavior for the Flink Kafka Consumer.

Cheers,
Gordon


On 19 July 2017 at 3:23:01 PM, Moiz Jinia ([hidden email]) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream
consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G1* (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G2* (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that
during the window when both A and A' are running, all messages should go to
both jobs. (And of course I want that job A' should start consuming from the
offsets in the savepoint and not the earliest).






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Does FlinkKafkaConsumer010 care about consumer group?

Tzu-Li (Gordon) Tai
Does this mean I can use the same consumer group G1 for the newer version A'? And inspite of same consumer group, A' will receive messages from all partitions when its started from savepoint?

Yes. That’s true. Flink internally uses static partition assignment, and the clients are assigned whatever partition states they are restored with.
The only “conflict” this would introduce is that both jobs will be competing offset committing to the same consumer group in Kafka (again, this doesn’t affect exactly-once but might mess up other external monitoring tools you may be using).

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka source in the job?

That would be a separate topic. Setting the UID of operators explicitly is usually always recommended before moving to production. See [1].

If your job code hasn’t changed across the restores, then it should be fine even if you didn’t set the UID.


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html


On 19 July 2017 at 3:41:28 PM, Moiz S Jinia ([hidden email]) wrote:

Does this mean I can use the same consumer group G1 for the newer version A'? And inspite of same consumer group, A' will receive messages from all partitions when its started from savepoint?

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka source in the job?

Thanks,
Moiz

On Wed, Jul 19, 2017 at 1:06 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled completely (both when checkpointing is enabled or disabled). See [1] on details about that.
2. When starting fresh (not starting from some savepoint), if you choose to use GROUP_OFFSETS as the start position, then the consumer group would also be used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected behavior for the Flink Kafka Consumer.

Cheers,
Gordon


On 19 July 2017 at 3:23:01 PM, Moiz Jinia ([hidden email]) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream
consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G1* (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G2* (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that
during the window when both A and A' are running, all messages should go to
both jobs. (And of course I want that job A' should start consuming from the
offsets in the savepoint and not the earliest).






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Does FlinkKafkaConsumer010 care about consumer group?

Moiz Jinia
Great thanks that was very helpful.

One last question -
If your job code hasn’t changed across the restores, then it should be fine even if you didn’t set the UID.

What kind of code change? What if the operator pipeline is still the same but there's a some business logic change? 


On Wed, Jul 19, 2017 at 1:16 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Does this mean I can use the same consumer group G1 for the newer version A'? And inspite of same consumer group, A' will receive messages from all partitions when its started from savepoint?

Yes. That’s true. Flink internally uses static partition assignment, and the clients are assigned whatever partition states they are restored with.
The only “conflict” this would introduce is that both jobs will be competing offset committing to the same consumer group in Kafka (again, this doesn’t affect exactly-once but might mess up other external monitoring tools you may be using).

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka source in the job?

That would be a separate topic. Setting the UID of operators explicitly is usually always recommended before moving to production. See [1].

If your job code hasn’t changed across the restores, then it should be fine even if you didn’t set the UID.


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/production_ready.html


On 19 July 2017 at 3:41:28 PM, Moiz S Jinia ([hidden email]) wrote:

Does this mean I can use the same consumer group G1 for the newer version A'? And inspite of same consumer group, A' will receive messages from all partitions when its started from savepoint?

I am using Flink 1.2.1. Does the above plan require setting uid on the Kafka source in the job?

Thanks,
Moiz

On Wed, Jul 19, 2017 at 1:06 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi!

The only occasions which the consumer group is used is:
1. When committing offsets back to Kafka. Since Flink 1.3, this can be disabled completely (both when checkpointing is enabled or disabled). See [1] on details about that.
2. When starting fresh (not starting from some savepoint), if you choose to use GROUP_OFFSETS as the start position, then the consumer group would also be used. If starting from a savepoint, then this is irrelevant. See [2].

Note that it actually isn’t used in any critical paths for Flink’s exactly-once processing guarantees, or partition to source parallel instance assignments.

So, the desired behavior in which you described is exactly the expected behavior for the Flink Kafka Consumer.

Cheers,
Gordon


On 19 July 2017 at 3:23:01 PM, Moiz Jinia ([hidden email]) wrote:

Below is a plan for downtime-free upgrade of a Flink job. The downstream
consumer of the Flink job is duplicate proof.

Scenario 1 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G1* (12 slot job again)
4. Stop job A.

Scenario 2 -
1. Start Flink job A with consumer group G1 (12 slot job)
2. While job A is running, take a savepoint AS.
3. Start newer version of Flink job A' from savepoint AS with consumer group
*G2* (12 slot job again)
4. Stop job A

Does it matter what consumer group job A' uses? The desired behavior is that
during the window when both A and A' are running, all messages should go to
both jobs. (And of course I want that job A' should start consuming from the
offsets in the savepoint and not the earliest).






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-FlinkKafkaConsumer010-care-about-consumer-group-tp14323.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.