Kafka topic partition skewness causes watermark not being emitted

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka topic partition skewness causes watermark not being emitted

tao xiao
Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was never closed even I sent a message with 10 minutes gap. After digging into the source code, AbstractFetcher[1] that is responsible for sending watermark to downstream calculates the min watermark of all partitions. Due to the fact that we don't have data in partition 1, the watermark returned from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to downstream. 

I want to know if this is expected behavior or a bug. If this is expected behavior how do I avoid the delay of watermark firing when data is not evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements AssignerWithPeriodicWatermarks<SessionEvent> {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010<SessionEvent> consumer = new FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream<SessionEvent> input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

Tzu-Li (Gordon) Tai
Hi,

This is expected behaviour due to how the per-partition watermarks are designed in the Kafka consumer, but I think it’s probably a good idea to handle idle partitions also when the Kafka consumer itself emits watermarks. I’ve filed a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with the existing APIs, unfortunately. Is the skewed partition data intentional, or only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao ([hidden email]) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was never closed even I sent a message with 10 minutes gap. After digging into the source code, AbstractFetcher[1] that is responsible for sending watermark to downstream calculates the min watermark of all partitions. Due to the fact that we don't have data in partition 1, the watermark returned from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to downstream. 

I want to know if this is expected behavior or a bug. If this is expected behavior how do I avoid the delay of watermark firing when data is not evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements AssignerWithPeriodicWatermarks<SessionEvent> {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010<SessionEvent> consumer = new FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream<SessionEvent> input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

tao xiao
The case I described was for experiment only but data skewness would happen in production. The current implementation will block the watermark emission to downstream until all partition move forward which has great impact on latency. It may be a good idea to expose an API to users to decide what the best way is to control watermark emission 

On Fri, 13 Jan 2017 at 21:57 Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

This is expected behaviour due to how the per-partition watermarks are designed in the Kafka consumer, but I think it’s probably a good idea to handle idle partitions also when the Kafka consumer itself emits watermarks. I’ve filed a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with the existing APIs, unfortunately. Is the skewed partition data intentional, or only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao ([hidden email]) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was never closed even I sent a message with 10 minutes gap. After digging into the source code, AbstractFetcher[1] that is responsible for sending watermark to downstream calculates the min watermark of all partitions. Due to the fact that we don't have data in partition 1, the watermark returned from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to downstream. 

I want to know if this is expected behavior or a bug. If this is expected behavior how do I avoid the delay of watermark firing when data is not evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements AssignerWithPeriodicWatermarks<SessionEvent> {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010<SessionEvent> consumer = new FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream<SessionEvent> input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

gerardg
I'm also affected by this behavior. There are no updates in FLINK-5479 but
did you manage to find a way to workaround this?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

Tzu-Li (Gordon) Tai
Hi,

I've just elevated FLINK-5479 to BLOCKER for 1.5.

Unfortunately, AFAIK there is no easy workaround solution for this issue yet in the releases so far.
The min watermark logic that controls per-partition watermark emission is hidden inside the consumer, making it hard to work around it.

One possible solution I can imagine, but perhaps not that trivial, is to inject some special marker event into all partitions periodically.
The watermark assigner should be able to recognize this special marker and try to provide some watermark for it.
Another option is that I can provide some patch you can apply for a custom build of the Kafka connector that handles partition idleness properly.
However, given that we're aiming for a faster release cycle for Flink 1.5 (proposed release date is Feb. 2018), it might not be worth the extra maintenance effort on your side of a custom build.

Best,
Gordon


On Tue, Dec 12, 2017 at 9:28 PM, gerardg <[hidden email]> wrote:
I'm also affected by this behavior. There are no updates in FLINK-5479 but
did you manage to find a way to workaround this?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

gerardg
Thanks Gordon. 

Don't worry, I'll be careful to not have empty partitions until the next release. 
Also, I'll keep an eye to FLINK-5479 and if at some point I see that there is a fix and the issue bothers us too much I'll try to apply the patch myself to the latest stable release.

Gerard

On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I've just elevated FLINK-5479 to BLOCKER for 1.5.

Unfortunately, AFAIK there is no easy workaround solution for this issue yet in the releases so far.
The min watermark logic that controls per-partition watermark emission is hidden inside the consumer, making it hard to work around it.

One possible solution I can imagine, but perhaps not that trivial, is to inject some special marker event into all partitions periodically.
The watermark assigner should be able to recognize this special marker and try to provide some watermark for it.
Another option is that I can provide some patch you can apply for a custom build of the Kafka connector that handles partition idleness properly.
However, given that we're aiming for a faster release cycle for Flink 1.5 (proposed release date is Feb. 2018), it might not be worth the extra maintenance effort on your side of a custom build.

Best,
Gordon


On Tue, Dec 12, 2017 at 9:28 PM, gerardg <[hidden email]> wrote:
I'm also affected by this behavior. There are no updates in FLINK-5479 but
did you manage to find a way to workaround this?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Kafka topic partition skewness causes watermark not being emitted

Juho Autio
A possible workaround while waiting for FLINK-5479, if someone is hitting the same problem: we chose to send "heartbeat" messages periodically to all topics & partitions found on our Kafka. We do that through the service that normally writes to our Kafka. This way every partition always has some ~recent timestamps.

On Wed, Dec 13, 2017 at 1:06 PM, Gerard Garcia <[hidden email]> wrote:
Thanks Gordon. 

Don't worry, I'll be careful to not have empty partitions until the next release. 
Also, I'll keep an eye to FLINK-5479 and if at some point I see that there is a fix and the issue bothers us too much I'll try to apply the patch myself to the latest stable release.

Gerard

On Wed, Dec 13, 2017 at 10:31 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi,

I've just elevated FLINK-5479 to BLOCKER for 1.5.

Unfortunately, AFAIK there is no easy workaround solution for this issue yet in the releases so far.
The min watermark logic that controls per-partition watermark emission is hidden inside the consumer, making it hard to work around it.

One possible solution I can imagine, but perhaps not that trivial, is to inject some special marker event into all partitions periodically.
The watermark assigner should be able to recognize this special marker and try to provide some watermark for it.
Another option is that I can provide some patch you can apply for a custom build of the Kafka connector that handles partition idleness properly.
However, given that we're aiming for a faster release cycle for Flink 1.5 (proposed release date is Feb. 2018), it might not be worth the extra maintenance effort on your side of a custom build.

Best,
Gordon


On Tue, Dec 12, 2017 at 9:28 PM, gerardg <[hidden email]> wrote:
I'm also affected by this behavior. There are no updates in FLINK-5479 but
did you manage to find a way to workaround this?

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/