Is outputting from components other than sinks or side outputs a no-no ?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Is outputting from components other than sinks or side outputs a no-no ?

Tom Fennelly
Hi.

What are the negative side effects of (for example) a filter function occasionally making a call out to a DB ? Is this a big no-no and should all outputs be done through sinks and side outputs, no exceptions ?

Regards,

Tom.
Reply | Threaded
Open this post in threaded view
|

Re: Is outputting from components other than sinks or side outputs a no-no ?

David Anderson-3
Every job is required to have a sink, but there's no requirement that all output be done via sinks. It's not uncommon, and doesn't have to cause problems, to have other operators that do I/O.

What can be problematic, however, is doing blocking I/O. While your user function is blocked, the function will exert back pressure, and checkpoint barriers will be unable to make any progress. This sometimes leads to checkpoint timeouts and job failures. So it's recommended to make any I/O you do asynchronous, using an AsyncFunction [1] or something similar.

Note that the asynchronous i/o function stores the records for in-flight asynchronous requests in checkpoints, and restores/re-triggers the requests when recovering from a failure. This might lead to duplicate results if you are using it to do non-idempotent database writes. If you need transactions, use a sink that offers them.


Best,
David

On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly <[hidden email]> wrote:
Hi.

What are the negative side effects of (for example) a filter function occasionally making a call out to a DB ? Is this a big no-no and should all outputs be done through sinks and side outputs, no exceptions ?

Regards,

Tom.
Reply | Threaded
Open this post in threaded view
|

Re: Is outputting from components other than sinks or side outputs a no-no ?

Tom Fennelly
Thank you David.

In the case we have in mind it should only happen literally on the very rare Exception i.e. in some cases if somehow an uncaught exception occurs, we want to send the record to a DLQ and handle the retry manually Vs checkpointing and restarting.

Regards,

Tom.


On Sun, Jul 26, 2020 at 1:14 PM David Anderson <[hidden email]> wrote:
Every job is required to have a sink, but there's no requirement that all output be done via sinks. It's not uncommon, and doesn't have to cause problems, to have other operators that do I/O.

What can be problematic, however, is doing blocking I/O. While your user function is blocked, the function will exert back pressure, and checkpoint barriers will be unable to make any progress. This sometimes leads to checkpoint timeouts and job failures. So it's recommended to make any I/O you do asynchronous, using an AsyncFunction [1] or something similar.

Note that the asynchronous i/o function stores the records for in-flight asynchronous requests in checkpoints, and restores/re-triggers the requests when recovering from a failure. This might lead to duplicate results if you are using it to do non-idempotent database writes. If you need transactions, use a sink that offers them.


Best,
David

On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly <[hidden email]> wrote:
Hi.

What are the negative side effects of (for example) a filter function occasionally making a call out to a DB ? Is this a big no-no and should all outputs be done through sinks and side outputs, no exceptions ?

Regards,

Tom.
Reply | Threaded
Open this post in threaded view
|

Re: Is outputting from components other than sinks or side outputs a no-no ?

Stephen Connolly-2
I am not 100% certain that David is talking about the same pattern of usage that you are Tom.

David, the pattern Tom is talking about is something like this...

try {
  do something with record
} catch (SomeException e) {
  push record to DLQ
}

My concern is that if we have a different failure, or even a restart from checkpoint because say the task manager OOM'd or was killed... now the record is replayed... and this time the "do something with record" succeeded... but it's still on the DLQ from last time

If the DLQ is a flink native output that pushes to an exactly-once sink then you do not have that issue. When you roll the side-output behind flinks back... then you have to take all those potentials into account which significantly complicates the code

On 2020/07/27 07:45:27, Tom Fennelly <[hidden email]> wrote:

> Thank you David.
>
> In the case we have in mind it should only happen literally on the very
> rare Exception i.e. in some cases if somehow an uncaught exception occurs,
> we want to send the record to a DLQ and handle the retry manually Vs
> checkpointing and restarting.
>
> Regards,
>
> Tom.
>
>
> On Sun, Jul 26, 2020 at 1:14 PM David Anderson <[hidden email]>
> wrote:
>
> > Every job is required to have a sink, but there's no requirement that all
> > output be done via sinks. It's not uncommon, and doesn't have to cause
> > problems, to have other operators that do I/O.
> >
> > What can be problematic, however, is doing blocking I/O. While your user
> > function is blocked, the function will exert back pressure, and checkpoint
> > barriers will be unable to make any progress. This sometimes leads to
> > checkpoint timeouts and job failures. So it's recommended to make any I/O
> > you do asynchronous, using an AsyncFunction [1] or something similar.
> >
> > Note that the asynchronous i/o function stores the records for in-flight
> > asynchronous requests in checkpoints, and restores/re-triggers the requests
> > when recovering from a failure. This might lead to duplicate results if you
> > are using it to do non-idempotent database writes. If you need
> > transactions, use a sink that offers them.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html>
> >
> > Best,
> > David
> >
> > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly <[hidden email]>
> > wrote:
> >
> >> Hi.
> >>
> >> What are the negative side effects of (for example) a filter function
> >> occasionally making a call out to a DB ? Is this a big no-no and should all
> >> outputs be done through sinks and side outputs, no exceptions ?
> >>
> >> Regards,
> >>
> >> Tom.
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Is outputting from components other than sinks or side outputs a no-no ?

Tom Fennelly
No, I think David answered the specific question that I asked i.e. is it okay (or not) for operators other than sinks and side outputs to do I/O. Purging DLQ entries is something we'll need to be able to do anyway (for some scenarios - aside from successful checkpoint retries) and I specifically wasn't asking about sink functions.

On Mon, Jul 27, 2020 at 10:08 AM Stephen Connolly <[hidden email]> wrote:
I am not 100% certain that David is talking about the same pattern of usage that you are Tom.

David, the pattern Tom is talking about is something like this...

try {
  do something with record
} catch (SomeException e) {
  push record to DLQ
}

My concern is that if we have a different failure, or even a restart from checkpoint because say the task manager OOM'd or was killed... now the record is replayed... and this time the "do something with record" succeeded... but it's still on the DLQ from last time

If the DLQ is a flink native output that pushes to an exactly-once sink then you do not have that issue. When you roll the side-output behind flinks back... then you have to take all those potentials into account which significantly complicates the code

On 2020/07/27 07:45:27, Tom Fennelly <[hidden email]> wrote:
> Thank you David.
>
> In the case we have in mind it should only happen literally on the very
> rare Exception i.e. in some cases if somehow an uncaught exception occurs,
> we want to send the record to a DLQ and handle the retry manually Vs
> checkpointing and restarting.
>
> Regards,
>
> Tom.
>
>
> On Sun, Jul 26, 2020 at 1:14 PM David Anderson <[hidden email]>
> wrote:
>
> > Every job is required to have a sink, but there's no requirement that all
> > output be done via sinks. It's not uncommon, and doesn't have to cause
> > problems, to have other operators that do I/O.
> >
> > What can be problematic, however, is doing blocking I/O. While your user
> > function is blocked, the function will exert back pressure, and checkpoint
> > barriers will be unable to make any progress. This sometimes leads to
> > checkpoint timeouts and job failures. So it's recommended to make any I/O
> > you do asynchronous, using an AsyncFunction [1] or something similar.
> >
> > Note that the asynchronous i/o function stores the records for in-flight
> > asynchronous requests in checkpoints, and restores/re-triggers the requests
> > when recovering from a failure. This might lead to duplicate results if you
> > are using it to do non-idempotent database writes. If you need
> > transactions, use a sink that offers them.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html>
> >
> > Best,
> > David
> >
> > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly <[hidden email]>
> > wrote:
> >
> >> Hi.
> >>
> >> What are the negative side effects of (for example) a filter function
> >> occasionally making a call out to a DB ? Is this a big no-no and should all
> >> outputs be done through sinks and side outputs, no exceptions ?
> >>
> >> Regards,
> >>
> >> Tom.
> >>
> >
>