No output when using event time with multiple Kafka partitions

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

No output when using event time with multiple Kafka partitions

Yassin Marzouki
Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine

Reply | Threaded
Open this post in threaded view
|

Re: No output when using event time with multiple Kafka partitions

Kostas Kloudas
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after the:
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})

This at least will tell us if reading from Kafka works as expected.

Kostas

On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <[hidden email]> wrote:

Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine


Reply | Threaded
Open this post in threaded view
|

Re: No output when using event time with multiple Kafka partitions

Yassin Marzouki
Hi Kostas,

When I remove the window and the apply() and put print() after assignTimestampsAndWatermarks, the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1 using env.addSource(kafka).setParallelism(1) (the window and the apply() still removed), results are printed using all available slots (number of CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source parallelism, no messages are printed (only regular kafka consumer and flink logs), and if the source parallelism is set to 1, messages are printed correctly:

1> Window: TimeWindow{start=1420070400000, end=1420156800000}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=1420156800000, end=1420243200000}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=1420416000000, end=1420502400000}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=1420243200000, end=1420329600000}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after the:
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})

This at least will tell us if reading from Kafka works as expected.

Kostas

On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <[hidden email]> wrote:

Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine



Reply | Threaded
Open this post in threaded view
|

Re: No output when using event time with multiple Kafka partitions

Yassin Marzouki
I just tried playing with the source paralleism setting, and I got a very strange result:

If specify the source parallism using env.addSource(kafka).setParallelism(N), results are printed correctly for any number N except for N=4. I guess that's related to the number of task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set automatically to 4. Isn't it supposed to be set to the number of topic patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <[hidden email]> wrote:
Hi Kostas,

When I remove the window and the apply() and put print() after assignTimestampsAndWatermarks, the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1 using env.addSource(kafka).setParallelism(1) (the window and the apply() still removed), results are printed using all available slots (number of CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source parallelism, no messages are printed (only regular kafka consumer and flink logs), and if the source parallelism is set to 1, messages are printed correctly:

1> Window: TimeWindow{start=1420070400000, end=1420156800000}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=1420156800000, end=1420243200000}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=1420416000000, end=1420502400000}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=1420243200000, end=1420329600000}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after the:
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})

This at least will tell us if reading from Kafka works as expected.

Kostas

On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <[hidden email]> wrote:

Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: No output when using event time with multiple Kafka partitions

Aljoscha Krettek
Hi,
when running in local mode the default parallelism is always the number of (possibly virtual) CPU cores. The parallelism of the sink is set before it gets a chance to find out how many Kafka partitions there are. I think the reason for the behavior you're observing is that only one of your two partitions ever receives elements and that thus the watermark does not advance for that partition. Could that be the case?

Cheers,
Aljoscha

On Wed, 27 Jul 2016 at 14:58 Yassin Marzouki <[hidden email]> wrote:
I just tried playing with the source paralleism setting, and I got a very strange result:

If specify the source parallism using env.addSource(kafka).setParallelism(N), results are printed correctly for any number N except for N=4. I guess that's related to the number of task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set automatically to 4. Isn't it supposed to be set to the number of topic patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <[hidden email]> wrote:
Hi Kostas,

When I remove the window and the apply() and put print() after assignTimestampsAndWatermarks, the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1 using env.addSource(kafka).setParallelism(1) (the window and the apply() still removed), results are printed using all available slots (number of CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source parallelism, no messages are printed (only regular kafka consumer and flink logs), and if the source parallelism is set to 1, messages are printed correctly:

1> Window: TimeWindow{start=1420070400000, end=1420156800000}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=1420156800000, end=1420243200000}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=1420416000000, end=1420502400000}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=1420243200000, end=1420329600000}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after the:
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})

This at least will tell us if reading from Kafka works as expected.

Kostas

On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <[hidden email]> wrote:

Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine




Reply | Threaded
Open this post in threaded view
|

Re: No output when using event time with multiple Kafka partitions

Yassin Marzouki
Hi Aljoscha,

Sorry for the late response, I was busy and couldn't make time to work on this again again until now.
Indeed, it turns out only one of the partitions is not receiving elements. The reason is that the producer will stick to a partition for topic.metadata.refresh.interval.ms (defaults to 10 mins) time before picking another partition at random. So I reduced the topic.metadata.refresh.interval.ms, and I was able to get an output as soon as the messages are produced.
I still have some questions about an unclear behavior regarding the parallelism and watermarks assignment when one partition is empty, which I will ask in a new mailing thread.
Thanks a lot for your help!

Best,
Yassine

On Fri, Jul 29, 2016 at 12:43 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
when running in local mode the default parallelism is always the number of (possibly virtual) CPU cores. The parallelism of the sink is set before it gets a chance to find out how many Kafka partitions there are. I think the reason for the behavior you're observing is that only one of your two partitions ever receives elements and that thus the watermark does not advance for that partition. Could that be the case?

Cheers,
Aljoscha

On Wed, 27 Jul 2016 at 14:58 Yassin Marzouki <[hidden email]> wrote:
I just tried playing with the source paralleism setting, and I got a very strange result:

If specify the source parallism using env.addSource(kafka).setParallelism(N), results are printed correctly for any number N except for N=4. I guess that's related to the number of task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set automatically to 4. Isn't it supposed to be set to the number of topic patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <[hidden email]> wrote:
Hi Kostas,

When I remove the window and the apply() and put print() after assignTimestampsAndWatermarks, the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1 using env.addSource(kafka).setParallelism(1) (the window and the apply() still removed), results are printed using all available slots (number of CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source parallelism, no messages are printed (only regular kafka consumer and flink logs), and if the source parallelism is set to 1, messages are printed correctly:

1> Window: TimeWindow{start=1420070400000, end=1420156800000}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=1420156800000, end=1420243200000}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=1420416000000, end=1420502400000}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=1420243200000, end=1420329600000}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <[hidden email]> wrote:
Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after the:
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})

This at least will tell us if reading from Kafka works as expected.

Kostas

On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <[hidden email]> wrote:

Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I guess that is because messages from multiple partitions arrive out-of-order to the timestamp exctractor according to this thread, correct?
So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in the documentation example (with a higher delay) in order to handle out-of-order events, but I still can't get any output. Why is that?

Best,
Yassine