Processing single events for minimum latency

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Processing single events for minimum latency

Pankaj Chand
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Processing single events for minimum latency

Piotr Nowojski-4
Hi Pankaj,

I'm not entirely sure if I understand your question.

If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every processed record.

Other than that, you might also want to reduce `execution.buffer-timeout` from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you are looking for?

Piotrek


śr., 14 paź 2020 o 12:38 Pankaj Chand <[hidden email]> napisał(a):
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Processing single events for minimum latency

Pankaj Chand
Hi Piotrek,

Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing.

> You still can use windowing, but you might want to emit updated value of the window per every processed record.

How do I do this?

Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

Thank you,

Pankaj

On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankaj,

I'm not entirely sure if I understand your question.

If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every processed record.

Other than that, you might also want to reduce `execution.buffer-timeout` from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you are looking for?

Piotrek


śr., 14 paź 2020 o 12:38 Pankaj Chand <[hidden email]> napisał(a):
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Processing single events for minimum latency

Piotr Nowojski-4
Hi Pankay,

Yes, you can trigger a window per each element, take a look at the Window Triggers [1].

Flink is always processing all records immediately. The only things that can delay processing elements are:
- buffering elements on the operator's state (vide WindowOperator)
- buffer-timeout (but that's on the output, so it's not delaying processing per se)
- back pressure
- exactly-once checkpointing (especially under the back pressure)

> Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

No, sorry it's not possible :(

Best,
Piotrek


czw., 15 paź 2020 o 01:55 Pankaj Chand <[hidden email]> napisał(a):
Hi Piotrek,

Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing.

> You still can use windowing, but you might want to emit updated value of the window per every processed record.

How do I do this?

Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

Thank you,

Pankaj

On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankaj,

I'm not entirely sure if I understand your question.

If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every processed record.

Other than that, you might also want to reduce `execution.buffer-timeout` from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you are looking for?

Piotrek


śr., 14 paź 2020 o 12:38 Pankaj Chand <[hidden email]> napisał(a):
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Processing single events for minimum latency

Pankaj Chand
Thank you for the quick and informative reply, Piotrek!

On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankay,

Yes, you can trigger a window per each element, take a look at the Window Triggers [1].

Flink is always processing all records immediately. The only things that can delay processing elements are:
- buffering elements on the operator's state (vide WindowOperator)
- buffer-timeout (but that's on the output, so it's not delaying processing per se)
- back pressure
- exactly-once checkpointing (especially under the back pressure)

> Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

No, sorry it's not possible :(

Best,
Piotrek


czw., 15 paź 2020 o 01:55 Pankaj Chand <[hidden email]> napisał(a):
Hi Piotrek,

Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing.

> You still can use windowing, but you might want to emit updated value of the window per every processed record.

How do I do this?

Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

Thank you,

Pankaj

On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankaj,

I'm not entirely sure if I understand your question.

If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every processed record.

Other than that, you might also want to reduce `execution.buffer-timeout` from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you are looking for?

Piotrek


śr., 14 paź 2020 o 12:38 Pankaj Chand <[hidden email]> napisał(a):
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj
Reply | Threaded
Open this post in threaded view
|

Re: Processing single events for minimum latency

Piotr Nowojski-4
No problem :)

Piotrek

czw., 15 paź 2020 o 08:18 Pankaj Chand <[hidden email]> napisał(a):
Thank you for the quick and informative reply, Piotrek!

On Thu, Oct 15, 2020 at 2:09 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankay,

Yes, you can trigger a window per each element, take a look at the Window Triggers [1].

Flink is always processing all records immediately. The only things that can delay processing elements are:
- buffering elements on the operator's state (vide WindowOperator)
- buffer-timeout (but that's on the output, so it's not delaying processing per se)
- back pressure
- exactly-once checkpointing (especially under the back pressure)

> Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

No, sorry it's not possible :(

Best,
Piotrek


czw., 15 paź 2020 o 01:55 Pankaj Chand <[hidden email]> napisał(a):
Hi Piotrek,

Thank you for replying! I want to process each record as soon as it is ingested (or reaches an operator) without waiting for a window for records to arrive. However, by not using windows, I am not sure if each record gets emitted immediately upon processing.

> You still can use windowing, but you might want to emit updated value of the window per every processed record.

How do I do this?

Also, is there any way I can change the execution.buffer-timeout or setbuffertimeout(milliseconds) dynamically while the job is running?

Thank you,

Pankaj

On Wed, Oct 14, 2020 at 9:42 AM Piotr Nowojski <[hidden email]> wrote:
Hi Pankaj,

I'm not entirely sure if I understand your question.

If you want to minimize latency, you should avoid using windows or any other operators, that are buffering data for long periods of time. You still can use windowing, but you might want to emit updated value of the window per every processed record.

Other than that, you might also want to reduce `execution.buffer-timeout` from the default value of 100ms down to 1ms, or 0ms [1]. Is this what you are looking for?

Piotrek


śr., 14 paź 2020 o 12:38 Pankaj Chand <[hidden email]> napisał(a):
Hi all,

What is the recommended way to make a Flink job that processes each event individually as soon as it comes and without waiting for a window, in order to minimize latency in the entire DAG of operators?

For example, here is some sample WordCount code (without windws), followed by some known ways:

 DataStream<WordWithCount> wordCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>())
            .keyBy("word")
            .reduce(new ReduceFunction<WordWithCount>());
  1. Don't include any TimeWindow/CountWindow function (does this actually achieve what I want?)
  2. Use a CountWindow with a count of 1
  3. Make a Trigger that fires to process each event when it comes in
I think the above methods only work at the processing level and latency with respect to a single operator, but does not affect the latency of an event in the entire Flink job's DAG since those ways do not affect the buffertimeout value.

Thanks,

Pankaj