Recommended pattern for implementing a DLQ with Flink+Kafka

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Recommended pattern for implementing a DLQ with Flink+Kafka

Tom Fennelly
Hi.

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

Regards,

Tom.
Reply | Threaded
Open this post in threaded view
|

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Eleanore Jin
+1 we have a similar use case for message schema validation.

Eleanore

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:
Hi.

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

Regards,

Tom.
Reply | Threaded
Open this post in threaded view
|

RE: Recommended pattern for implementing a DLQ with Flink+Kafka

Chen Qin

Could you more specific on what “failed message” means here?

In general side output can do something like were

 

def process(ele) {

   try{

        biz

} catch {

   Sideout( ele + exception context)

}

}

 

process(func).sideoutput(tag).addSink(kafkasink)

 

Thanks,

Chen

 

 

 

From: [hidden email]
Sent: Wednesday, July 22, 2020 9:25 AM
To: [hidden email]
Cc: [hidden email]
Subject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka

 

+1 we have a similar use case for message schema validation.

 

Eleanore

 

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:

Hi.

 

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

 

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

 

Regards,

 

Tom.

 

Reply | Threaded
Open this post in threaded view
|

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Tom Fennelly
Thanks Chen.

I'm thinking about errors that occur while processing a record/message that shouldn't be retried until after some "action" has been taken Vs flooding the system with pointless retries e.g.
  • A side output step might involve an API call to an external system and that system is down atm so there's no point retrying until further notice. For this we want to be able to send something to a DLQ.
  • We have some bad code that is resulting in an uncaught exception in very specific cases. We want these to go to a DLQ and only be retried after the appropriate fix has been made.
The possible scenarios for this are numerous so I think my main question would be ... are there established general Flink patterns or best practices that can be applied for this, or is it something we'd need to hand-role on a case by case basis with a side output type solution such as in your example? We can do that but I just wanted to make sure I wasn't missing anything before heading down that road.

Regards,

Tom.


On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <[hidden email]> wrote:

Could you more specific on what “failed message” means here?

In general side output can do something like were

 

def process(ele) {

   try{

        biz

} catch {

   Sideout( ele + exception context)

}

}

 

process(func).sideoutput(tag).addSink(kafkasink)

 

Thanks,

Chen

 

 

 

From: [hidden email]
Sent: Wednesday, July 22, 2020 9:25 AM
To: [hidden email]
Cc: [hidden email]
Subject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka

 

+1 we have a similar use case for message schema validation.

 

Eleanore

 

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:

Hi.

 

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

 

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

 

Regards,

 

Tom.

 

Reply | Threaded
Open this post in threaded view
|

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

rmetzger0
Hey Tom,

I'm not aware of any patterns for this problem, sorry. Intuitively, I would send dead letters to a separate Kafka topic.

Best,
Robert


On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <[hidden email]> wrote:
Thanks Chen.

I'm thinking about errors that occur while processing a record/message that shouldn't be retried until after some "action" has been taken Vs flooding the system with pointless retries e.g.
  • A side output step might involve an API call to an external system and that system is down atm so there's no point retrying until further notice. For this we want to be able to send something to a DLQ.
  • We have some bad code that is resulting in an uncaught exception in very specific cases. We want these to go to a DLQ and only be retried after the appropriate fix has been made.
The possible scenarios for this are numerous so I think my main question would be ... are there established general Flink patterns or best practices that can be applied for this, or is it something we'd need to hand-role on a case by case basis with a side output type solution such as in your example? We can do that but I just wanted to make sure I wasn't missing anything before heading down that road.

Regards,

Tom.


On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <[hidden email]> wrote:

Could you more specific on what “failed message” means here?

In general side output can do something like were

 

def process(ele) {

   try{

        biz

} catch {

   Sideout( ele + exception context)

}

}

 

process(func).sideoutput(tag).addSink(kafkasink)

 

Thanks,

Chen

 

 

 

From: [hidden email]
Sent: Wednesday, July 22, 2020 9:25 AM
To: [hidden email]
Cc: [hidden email]
Subject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka

 

+1 we have a similar use case for message schema validation.

 

Eleanore

 

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:

Hi.

 

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

 

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

 

Regards,

 

Tom.

 

Reply | Threaded
Open this post in threaded view
|

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Tom Fennelly
Thanks Robert.

On Fri, Jul 24, 2020 at 2:32 PM Robert Metzger <[hidden email]> wrote:
Hey Tom,

I'm not aware of any patterns for this problem, sorry. Intuitively, I would send dead letters to a separate Kafka topic.

Best,
Robert


On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <[hidden email]> wrote:
Thanks Chen.

I'm thinking about errors that occur while processing a record/message that shouldn't be retried until after some "action" has been taken Vs flooding the system with pointless retries e.g.
  • A side output step might involve an API call to an external system and that system is down atm so there's no point retrying until further notice. For this we want to be able to send something to a DLQ.
  • We have some bad code that is resulting in an uncaught exception in very specific cases. We want these to go to a DLQ and only be retried after the appropriate fix has been made.
The possible scenarios for this are numerous so I think my main question would be ... are there established general Flink patterns or best practices that can be applied for this, or is it something we'd need to hand-role on a case by case basis with a side output type solution such as in your example? We can do that but I just wanted to make sure I wasn't missing anything before heading down that road.

Regards,

Tom.


On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <[hidden email]> wrote:

Could you more specific on what “failed message” means here?

In general side output can do something like were

 

def process(ele) {

   try{

        biz

} catch {

   Sideout( ele + exception context)

}

}

 

process(func).sideoutput(tag).addSink(kafkasink)

 

Thanks,

Chen

 

 

 

From: [hidden email]
Sent: Wednesday, July 22, 2020 9:25 AM
To: [hidden email]
Cc: [hidden email]
Subject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka

 

+1 we have a similar use case for message schema validation.

 

Eleanore

 

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:

Hi.

 

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

 

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

 

Regards,

 

Tom.

 

Reply | Threaded
Open this post in threaded view
|

Re: Recommended pattern for implementing a DLQ with Flink+Kafka

Arvid Heise-3
Hi Tom,

using side outputs is actually the established Flink pattern in that regard. The advantage of side output is that you do not depend on the DLQ concept of the source system, which is incredibly useful if you read from multiple systems.

Most commonly, the side-output is then outputted to another Kafka topic with the respective exception to be inspected manually (for broken data / programming errors).

For external systems, you'd usually use a retry if it's part of the enrichment, because often the follow-up steps depend on it. If the external system is just used as a sink and only rarely experiences outages, I'd recommend using a sink or at least revert to async IO.
If the external system is just used optionally or behaves very flaky, then having a DLQ with a separate retry topology/job is very valid.

Since you append a complete Flink program on the side-output, you can also add recovery logic to it. For example, you could go to a fallback external system, such as a different geolocation service, possibly maintained by an external provider (so your internal service is more or less used as a cache to save money).

On Fri, Jul 24, 2020 at 3:40 PM Tom Fennelly <[hidden email]> wrote:
Thanks Robert.

On Fri, Jul 24, 2020 at 2:32 PM Robert Metzger <[hidden email]> wrote:
Hey Tom,

I'm not aware of any patterns for this problem, sorry. Intuitively, I would send dead letters to a separate Kafka topic.

Best,
Robert


On Wed, Jul 22, 2020 at 7:22 PM Tom Fennelly <[hidden email]> wrote:
Thanks Chen.

I'm thinking about errors that occur while processing a record/message that shouldn't be retried until after some "action" has been taken Vs flooding the system with pointless retries e.g.
  • A side output step might involve an API call to an external system and that system is down atm so there's no point retrying until further notice. For this we want to be able to send something to a DLQ.
  • We have some bad code that is resulting in an uncaught exception in very specific cases. We want these to go to a DLQ and only be retried after the appropriate fix has been made.
The possible scenarios for this are numerous so I think my main question would be ... are there established general Flink patterns or best practices that can be applied for this, or is it something we'd need to hand-role on a case by case basis with a side output type solution such as in your example? We can do that but I just wanted to make sure I wasn't missing anything before heading down that road.

Regards,

Tom.


On Wed, Jul 22, 2020 at 5:46 PM Chen Qin <[hidden email]> wrote:

Could you more specific on what “failed message” means here?

In general side output can do something like were

 

def process(ele) {

   try{

        biz

} catch {

   Sideout( ele + exception context)

}

}

 

process(func).sideoutput(tag).addSink(kafkasink)

 

Thanks,

Chen

 

 

 

From: [hidden email]
Sent: Wednesday, July 22, 2020 9:25 AM
To: [hidden email]
Cc: [hidden email]
Subject: Re: Recommended pattern for implementing a DLQ with Flink+Kafka

 

+1 we have a similar use case for message schema validation.

 

Eleanore

 

On Wed, Jul 22, 2020 at 4:12 AM Tom Fennelly <[hidden email]> wrote:

Hi.

 

I've been searching blogs etc trying to see if there are established patterns/mechanisms for reprocessing of failed messages via something like a DLQ. I've read about using checkpointing and restarting tasks (not what we want because we want to keep processing forward) and then also how some use side outputs to filter "bad" data to a DLQ style topic. Kafka has dead letter topic configs too but it seems that can't really be used from inside Flink (from what I can see).

 

We're running Flink+Kafka. I'm new to Flink+Kafka so not sure if there just isn't a defined pattern for it, or if I'm just not asking the right questions in my searches. I searched the archives here and don't see anything either, which obviously makes me think that I'm not thinking about this in the "Flink way" :-|

 

Regards,

 

Tom.

 



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng