watermark does not progress

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

watermark does not progress

John O

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo

Reply | Threaded
Open this post in threaded view
|

Re: watermark does not progress

vino yang
Hi Johe,

In local mode, it should also work. 
When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 
What is your source? If you post your code, it might be easier to locate.
In addition, for positioning watermark, you can also refer to this email[1].


Thanks, vino.

John O <[hidden email]> 于2018年8月15日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo

Reply | Threaded
Open this post in threaded view
|

Re: watermark does not progress

Fabian Hueske-2
Hi John,

Watermarks cannot make progress if you have stream partitions that do not carry any data.
What kind of source are you using?

Best,
Fabian

2018-08-15 4:25 GMT+02:00 vino yang <[hidden email]>:
Hi Johe,

In local mode, it should also work. 
When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 
What is your source? If you post your code, it might be easier to locate.
In addition, for positioning watermark, you can also refer to this email[1].


Thanks, vino.

John O <[hidden email]> 于2018年8月15日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo


Reply | Threaded
Open this post in threaded view
|

RE: watermark does not progress

John O

I did some more testing.

Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka topic.

What I see is in print1 I see expected watermark

But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

 

Now, if I run the exact same code on a flink cluster, print2 outputs expected watermark.

 

Jo

 

From: Fabian Hueske <[hidden email]>
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang <[hidden email]>
Cc: John O <[hidden email]>; user <[hidden email]>
Subject: Re: watermark does not progress

 

Hi John,

 

Watermarks cannot make progress if you have stream partitions that do not carry any data.

What kind of source are you using?

 

Best,

Fabian

 

2018-08-15 4:25 GMT+02:00 vino yang <[hidden email]>:

Hi Johe,

 

In local mode, it should also work. 

When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 

What is your source? If you post your code, it might be easier to locate.

In addition, for positioning watermark, you can also refer to this email[1].

 

 

Thanks, vino.

 

John O <[hidden email]> 2018815日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo

 

Reply | Threaded
Open this post in threaded view
|

Re: watermark does not progress

Hequn Cheng
Hi John,

I guess the source data of local are different from the cluster. And as Fabian said, it is probably that some partitions don't carry data.
As a choice, you can set job parallelism to 1 and check the result.

Best, Hequn

On Wed, Aug 15, 2018 at 5:22 PM, John O <[hidden email]> wrote:

I did some more testing.

Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka topic.

What I see is in print1 I see expected watermark

But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

 

Now, if I run the exact same code on a flink cluster, print2 outputs expected watermark.

 

Jo

 

From: Fabian Hueske <[hidden email]>
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang <[hidden email]>
Cc: John O <[hidden email]>; user <[hidden email]>
Subject: Re: watermark does not progress

 

Hi John,

 

Watermarks cannot make progress if you have stream partitions that do not carry any data.

What kind of source are you using?

 

Best,

Fabian

 

2018-08-15 4:25 GMT+02:00 vino yang <[hidden email]>:

Hi Johe,

 

In local mode, it should also work. 

When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 

What is your source? If you post your code, it might be easier to locate.

In addition, for positioning watermark, you can also refer to this email[1].

 

 

Thanks, vino.

 

John O <[hidden email]> 2018815日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo

 


Reply | Threaded
Open this post in threaded view
|

RE: watermark does not progress

John O

Just wanted to post an update on this.

 

Problem was my dataset. I was using a kafka topic with multiple partitions but only generated data for a single key. This meant that in a parallelism>1 environment, some sources will never get any data and watermark. After “keyby”, the next processor will have to choose which watermark to use from the multiple sources(the lowest value) thus never progressing the watermark.

 

 

Jo

 

 

 

From: Hequn Cheng <[hidden email]>
Sent: Wednesday, August 15, 2018 6:38 AM
To: John O <[hidden email]>
Cc: Fabian Hueske <[hidden email]>; vino yang <[hidden email]>; user <[hidden email]>
Subject: Re: watermark does not progress

 

Hi John,

 

I guess the source data of local are different from the cluster. And as Fabian said, it is probably that some partitions don't carry data.
As a choice, you can set job parallelism to 1 and check the result.

 

Best, Hequn

 

On Wed, Aug 15, 2018 at 5:22 PM, John O <[hidden email]> wrote:

I did some more testing.

Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka topic.

What I see is in print1 I see expected watermark

But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

 

Now, if I run the exact same code on a flink cluster, print2 outputs expected watermark.

 

Jo

 

From: Fabian Hueske <[hidden email]>
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang <
[hidden email]>
Cc: John O <
[hidden email]>; user <[hidden email]>
Subject: Re: watermark does not progress

 

Hi John,

 

Watermarks cannot make progress if you have stream partitions that do not carry any data.

What kind of source are you using?

 

Best,

Fabian

 

2018-08-15 4:25 GMT+02:00 vino yang <[hidden email]>:

Hi Johe,

 

In local mode, it should also work. 

When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 

What is your source? If you post your code, it might be easier to locate.

In addition, for positioning watermark, you can also refer to this email[1].

 

 

Thanks, vino.

 

John O <[hidden email]> 2018815日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo

 

 

Reply | Threaded
Open this post in threaded view
|

Re: watermark does not progress

Hequn Cheng
Hi Jo,

Thanks for letting us know.

Best, Hequn

On Fri, Aug 17, 2018 at 2:12 AM, John O <[hidden email]> wrote:

Just wanted to post an update on this.

 

Problem was my dataset. I was using a kafka topic with multiple partitions but only generated data for a single key. This meant that in a parallelism>1 environment, some sources will never get any data and watermark. After “keyby”, the next processor will have to choose which watermark to use from the multiple sources(the lowest value) thus never progressing the watermark.

 

 

Jo

 

 

 

From: Hequn Cheng <[hidden email]>
Sent: Wednesday, August 15, 2018 6:38 AM
To: John O <[hidden email]>
Cc: Fabian Hueske <[hidden email]>; vino yang <[hidden email]>; user <[hidden email]>


Subject: Re: watermark does not progress

 

Hi John,

 

I guess the source data of local are different from the cluster. And as Fabian said, it is probably that some partitions don't carry data.
As a choice, you can set job parallelism to 1 and check the result.

 

Best, Hequn

 

On Wed, Aug 15, 2018 at 5:22 PM, John O <[hidden email]> wrote:

I did some more testing.

Below is a pseudo version of by setup.

kafkaconsumer->
assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor)->
process(print1 ctx.timerService().currentWatermark()) ->
keyBy(_.someProp) ->
process(print2 ctx.timerService().currentWatermark()) ->

I am manually sending monotonically increasing (eventtime ) records to kafka topic.

What I see is in print1 I see expected watermark

But print2 is always Long.MIN

It looks like keyBy wipes out the watermark.

 

Now, if I run the exact same code on a flink cluster, print2 outputs expected watermark.

 

Jo

 

From: Fabian Hueske <[hidden email]>
Sent: Wednesday, August 15, 2018 2:07 AM
To: vino yang <
[hidden email]>
Cc: John O <
[hidden email]>; user <[hidden email]>
Subject: Re: watermark does not progress

 

Hi John,

 

Watermarks cannot make progress if you have stream partitions that do not carry any data.

What kind of source are you using?

 

Best,

Fabian

 

2018-08-15 4:25 GMT+02:00 vino yang <[hidden email]>:

Hi Johe,

 

In local mode, it should also work. 

When you debug, you can set a breakpoint in the getCurrentWatermark method to see if you can enter the method and if the behavior is what you expect. 

What is your source? If you post your code, it might be easier to locate.

In addition, for positioning watermark, you can also refer to this email[1].

 

 

Thanks, vino.

 

John O <[hidden email]> 2018815日周三 上午9:44写道:

I am noticing that watermark does not progress as expected when running locally in IDE. It just stays at Long.MIN

 

I am using EventTime processing and have tried both these time extractors.

·         assignAscendingTimestamps ...

·         assignTimestampsAndWatermarks(BoundedOutOfOrdernessTimestampExtractor) ...

 

Also, configured the environment as so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

 

If I run the job on a flink cluster, I do see the watermark progress.

 

Is watermarking not supported in local mode?

 

Thanks

Jo