Hi Flink Users,
We have a scenario where we're reading from multiple kafka topics using a single kafka consumer. Each topic has a very different ingestion rate, like CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are performing ordering of these events across topics using a keyed process function (keyed on userId) and a EVENT_TIME watermark which is based on the ingestionTs of the record captured just before it is produced into kafka. On live data this pipeline works perfectly, but if I restart the job to process from an old savepoint (say 24hrs old), the job fills up the state, a full back pressure (ratio 1) gets created on the source operators, checkpoints start failing and the job eventually dies. My hypothesis is that the data from both the topics are read at the max rate possible, but since the watermark from the PageViewTopic will lag significantly behind the CheckoutTopic overall watermarks don't progress, excessive data from CheckoutTopic fills up the state and results in the failure mentioned above. I also observed this while backfilling from a savepoint using a single topic, even though watermarks do progress faster than before, the job has the same fate. In this case I'm assuming the offsets/watermarks of the individual partitions go out-of-sync with respect to time leading to a similar situation mentioned above. Is this understanding correct? is there a known solution for this? And if not, what is the suggested approach to tackle this problem? Thanks & Regards, Akshay Aggarwal
|
Hi,
I'm afraid your analysis is 100% correct. Currently there's no out-of-box feature for dealing with this but our work on a new source interface ([1]) will enable us to add a feature that we call "event-time alignment" where source readers would slow down reading from certain source partitions if their watermark advances to far beyond the minimum watermark over all source partitions. Best, Aljoscha On 07.02.20 13:36, Akshay Aggarwal wrote: > Hi Flink Users, > > We have a scenario where we're reading from multiple kafka topics using a > single kafka consumer. Each topic has a very different ingestion rate, like > CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are > performing ordering of these events across topics using a keyed process > function (keyed on userId) and a EVENT_TIME watermark which is based on the > ingestionTs of the record captured just before it is produced into kafka. > > On live data this pipeline works perfectly, but if I restart the job to > process from an old savepoint (say 24hrs old), the job fills up the state, > a full back pressure (ratio 1) gets created on the source operators, > checkpoints start failing and the job eventually dies. My hypothesis is > that the data from both the topics are read at the max rate possible, but > since the watermark from the PageViewTopic will lag significantly behind > the CheckoutTopic overall watermarks don't progress, excessive data > from CheckoutTopic fills up the state and results in the failure mentioned > above. > > I also observed this while backfilling from a savepoint using a single > topic, even though watermarks do progress faster than before, the job has > the same fate. In this case I'm assuming the offsets/watermarks of the > individual partitions go out-of-sync with respect to time leading to a > similar situation mentioned above. > > Is this understanding correct? is there a known solution for this? And if > not, what is the suggested approach to tackle this problem? > > Thanks & Regards, > Akshay Aggarwal > |
Thanks Aljoscha. Is there a JIRA where this is getting tracked? ~Akshay On Wed, Feb 12, 2020 at 1:56 PM Aljoscha Krettek <[hidden email]> wrote: Hi,
|
You can find more information here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface On Wed, Feb 12, 2020 at 11:30 AM Akshay Aggarwal <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |