Empty state restore seems to be broken for Kafka source (1.3.2)

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Empty state restore seems to be broken for Kafka source (1.3.2)

Gyula Fóra-2
Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Stefan Richter
Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula


Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Gyula Fóra-2
Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula


Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula



Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula




Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha
On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula





Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Gyula Fóra-2
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:46):
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha

On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula





Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Gyula Fóra-2
Hey,

I know it's old discussion but there also seems to be a problem with the logic in the kafka source alone regarding new topics added after a checkpoint. 

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:53):
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:46):
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha

On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula





Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
It might be old but it's not forgotten, the issue I created is actually marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.

The issue in Kafka is about new topics/partitions not being discovered or something else? That would be the expected behaviour in Flink < 1.4.0.

Best,
Aljoscha

On 12. Oct 2017, at 16:40, Gyula Fóra <[hidden email]> wrote:

Hey,

I know it's old discussion but there also seems to be a problem with the logic in the kafka source alone regarding new topics added after a checkpoint. 

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:53):
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:46):
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha

On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula






Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Gyula Fóra-2

Ok, thanks for the clarification. :)

Gyula


On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek <[hidden email]> wrote:
It might be old but it's not forgotten, the issue I created is actually marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.

The issue in Kafka is about new topics/partitions not being discovered or something else? That would be the expected behaviour in Flink < 1.4.0.

Best,
Aljoscha

On 12. Oct 2017, at 16:40, Gyula Fóra <[hidden email]> wrote:

Hey,

I know it's old discussion but there also seems to be a problem with the logic in the kafka source alone regarding new topics added after a checkpoint. 

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:53):
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:46):
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha

On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula






Reply | Threaded
Open this post in threaded view
|

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

Aljoscha Krettek
Still not nice, though, and it took a while to finalise discovery for 1.4. ;-)

If you need that now you might be able to back port the 1.4 consumer to 1.3.

On 12. Oct 2017, at 17:05, Gyula Fóra <[hidden email]> wrote:

Ok, thanks for the clarification. :)

Gyula


On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek <[hidden email]> wrote:
It might be old but it's not forgotten, the issue I created is actually marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.

The issue in Kafka is about new topics/partitions not being discovered or something else? That would be the expected behaviour in Flink < 1.4.0.

Best,
Aljoscha

On 12. Oct 2017, at 16:40, Gyula Fóra <[hidden email]> wrote:

Hey,

I know it's old discussion but there also seems to be a problem with the logic in the kafka source alone regarding new topics added after a checkpoint. 

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:53):
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 14., Cs, 14:46):
After a bit more digging I found that the "isRestored" flag doesn't work correctly if there are operators chained to the sink that have state: https://issues.apache.org/jira/browse/FLINK-7623

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha

On 6. Sep 2017, at 16:05, Aljoscha Krettek <[hidden email]> wrote:

After discussing this between Stefan and me we think that this should actually work.

Do you have the log output from restoring the Kafka Consumer? It would be interesting to see whether any of those print:

On 6. Sep 2017, at 14:45, Aljoscha Krettek <[hidden email]> wrote:

Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all partition on restore and if they didn't get any they know that they are new. There is no specific logic for detecting this situation, it's just that the partition discoverer will be seeded with this information and it will know if it discovers a new partition whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

On 6. Sep 2017, at 14:36, Gyula Fóra <[hidden email]> wrote:

Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2017. szept. 6., Sze, 14:31):
The problem here is that context.isRestored() is a global flag and not local to each operator. It says "yes this job was restored" but the source would need to know that it is actually brand new and never had any state. This is quite tricky to do, since there is currently no way (if I'm correct) to differentiate between "I got empty state but others maybe got state" and "this source never had state and neither had other parallel instances".

Best,
Aljoscha

On 6. Sep 2017, at 13:56, Stefan Richter <[hidden email]> wrote:

Thanks for the report, I will take a look.

Am 06.09.2017 um 11:48 schrieb Gyula Fóra <[hidden email]>:

Hi all,

We are running into some problems with the kafka source after changing the uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all over again, but what seems to happen is that the consumer thinks that it doesnt have any partitions assigned.


What is the expected behaviour here?

Thanks!
Gyula