Event time didn't advance because of some idle slots

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Event time didn't advance because of some idle slots

Soheil Pourbafrani

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?


Reply | Threaded
Open this post in threaded view
|

Re: Event time didn't advance because of some idle slots

Reza Samee
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <[hidden email]> wrote:

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?




--
رضا سامعی  | Reza Sameei | Software Developer | 09126662695
Reply | Threaded
Open this post in threaded view
|

Re: Event time didn't advance because of some idle slots

Fabian Hueske-2
Hi,

If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei <[hidden email]>:
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <[hidden email]> wrote:

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?




--
رضا سامعی  | Reza Sameei | Software Developer | 09126662695

Reply | Threaded
Open this post in threaded view
|

Re: Event time didn't advance because of some idle slots

Hequn Cheng
Hi Soheil,

You can set parallelism to 1 to solve the problem. 

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei <[hidden email]>:
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <[hidden email]> wrote:

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?




--
رضا سامعی  | Reza Sameei | Software Developer | 09126662695


Reply | Threaded
Open this post in threaded view
|

Re: Event time didn't advance because of some idle slots

vino yang

2018-07-31 17:14 GMT+08:00 Hequn Cheng <[hidden email]>:
Hi Soheil,

You can set parallelism to 1 to solve the problem. 

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei <[hidden email]>:
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <[hidden email]> wrote:

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?




--
رضا سامعی  | Reza Sameei | Software Developer | 09126662695



Reply | Threaded
Open this post in threaded view
|

Re: Event time didn't advance because of some idle slots

vino yang

2018-07-31 17:56 GMT+08:00 Soheil Pourbafrani <[hidden email]>:
Hi vino,

Could you please show markAsTemporary usage by a simple example?
Thanks

On Tue, Jul 31, 2018 at 2:10 PM, vino yang <[hidden email]> wrote:

2018-07-31 17:14 GMT+08:00 Hequn Cheng <[hidden email]>:
Hi Soheil,

You can set parallelism to 1 to solve the problem. 

On Tue, Jul 31, 2018 at 4:51 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

If you are using a custom source, you can call SourceContext.markAsTemporarilyIdle() to indicate that a task is currently not producing new records [1].

Best, Fabian

2018-07-31 8:50 GMT+02:00 Reza Sameei <[hidden email]>:
It's not a real solution; but why you don't change the parallelism for your `SourceFunction`?

On Tue, Jul 31, 2018 at 10:51 AM Soheil Pourbafrani <[hidden email]> wrote:

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?




--
رضا سامعی  | Reza Sameei | Software Developer | 09126662695