Slow watermark advances

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Slow watermark advances

Chengzhi Zhao
Hi, flink community,

I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output. 

A --> Output
B --> (Connect A) --> Output

I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.

My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there. 

Please advise and appreciate it in advance!


Best,
Chengzhi

Reply | Threaded
Open this post in threaded view
|

Re: Slow watermark advances

Xingcan Cui
Hi Chengzhi,

currently, the watermarks of the two streams of a connected stream are forcibly synchronized, i.e., the watermark is decided by the stream with a larger delay. Thus the window trigger is also affected by this mechanism. 

As a workaround, you could try to add (or subtract) a static time offset to one of your streams, which can make them more “close” to each other.

Best,
Xingcan

On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <[hidden email]> wrote:

Hi, flink community,

I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output. 

A --> Output
B --> (Connect A) --> Output

I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.

My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there. 

Please advise and appreciate it in advance!


Best,
Chengzhi


Reply | Threaded
Open this post in threaded view
|

Re: Slow watermark advances

Chengzhi Zhao
Hi Xingcan,

Thanks for your quick response and now I understand it better. To clarify, do you mean try to add a static time when I override extractTimestamp function?

For example, 

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = element.getCreationTime() + 3600000L //1 hour delay
    currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
    timestamp 
}

Appreciate your help!

Best,
Chengzhi


On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <[hidden email]> wrote:
Hi Chengzhi,

currently, the watermarks of the two streams of a connected stream are forcibly synchronized, i.e., the watermark is decided by the stream with a larger delay. Thus the window trigger is also affected by this mechanism. 

As a workaround, you could try to add (or subtract) a static time offset to one of your streams, which can make them more “close” to each other.

Best,
Xingcan


On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <[hidden email]> wrote:

Hi, flink community,

I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output. 

A --> Output
B --> (Connect A) --> Output

I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.

My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there. 

Please advise and appreciate it in advance!


Best,
Chengzhi



Reply | Threaded
Open this post in threaded view
|

Re: Slow watermark advances

Xingcan Cui
Yes, Chengzhi. That’s exactly what I mean. But you should be careful with the semantics of your pipeline. The problem cannot be gracefully solved if there’s a natural time offset between the two streams.

Best, Xingcan

On 14 Apr 2018, at 4:00 AM, Chengzhi Zhao <[hidden email]> wrote:

Hi Xingcan,

Thanks for your quick response and now I understand it better. To clarify, do you mean try to add a static time when I override extractTimestamp function?

For example, 

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = element.getCreationTime() + 3600000L //1 hour delay
    currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
    timestamp 
}

Appreciate your help!

Best,
Chengzhi


On Fri, Apr 13, 2018 at 12:49 PM, Xingcan Cui <[hidden email]> wrote:
Hi Chengzhi,

currently, the watermarks of the two streams of a connected stream are forcibly synchronized, i.e., the watermark is decided by the stream with a larger delay. Thus the window trigger is also affected by this mechanism. 

As a workaround, you could try to add (or subtract) a static time offset to one of your streams, which can make them more “close” to each other.

Best,
Xingcan


On 13 Apr 2018, at 11:48 PM, Chengzhi Zhao <[hidden email]> wrote:

Hi, flink community,

I had an issue with slow watermark advances and needs some help here. So here is what happened: I have two streams -- A and B, and they perform co-process to join together and A has another steam as output. 

A --> Output
B --> (Connect A) --> Output

I used BoundedOutOfOrdernessGenerator [1] with both A and B stream with 2 hours delay. The low watermark of A and output sink is within 2 hours window, however, the co-process end up with 10 hours low watermark late.

My setup is I am using file system as source, so every 15 mins there will be files been drop to a directory and flink pick them up from there. 

Please advise and appreciate it in advance!


Best,
Chengzhi