Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

Joseph Lorenzini

Hi all,

 

I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it.

 

I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the flink kafka consumer like so:

 

   consumer.assignTimestampsAndWatermarks(

            WatermarkStrategy

                .<CustomersUnion>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                .withIdleness(Duration.ofSeconds(10))

        );

 

The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.

 

Is this expected and if it how do you handle this use case?

 

Thanks,

Joe

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Reply | Threaded
Open this post in threaded view
|

Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

Arvid Heise-4
Hi Joe,

could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <[hidden email]> wrote:

Hi all,

 

I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it.

 

I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the flink kafka consumer like so:

 

   consumer.assignTimestampsAndWatermarks(

            WatermarkStrategy

                .<CustomersUnion>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                .withIdleness(Duration.ofSeconds(10))

        );

 

The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.

 

Is this expected and if it how do you handle this use case?

 

Thanks,

Joe

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Reply | Threaded
Open this post in threaded view
|

Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

Joseph Lorenzini

Hi Arvid,

 

I am on 1.11.2.

 

The flink job has four operators:

 

  1. Source from kakfa topic one: sent 14 records
  2. Source from kafka topic two: sent 6 records
  3. Map: received 15 records/sent 14 records
  4. Map: received 6 records/sent 6 records
  5. Tumbling Window to Filesink: received 20 records/sent 0 records

 

The watermark is the same for the map operators and the tumbling window, which is to say that between the map and tumbling window the watermark did not advance.

 

Any idea why that might be happening? I did notice that the timestamps for all kafka records are within a fraction of a second of one another. For example:

 

  • 2021-06-09T08:57:00.993-05:00
  • 2021-06-09T08:57:00.997-05:00

 

I also noted that some kafka records in topic A have the exact same timestamp as records in topic B.

 

Could timestamps not being far enough part (e.g millisecond or more) or two records between two soruces have the exact same time,  cause the watermarks to not advance?

 

 

Joe

 

From: Arvid Heise <[hidden email]>
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

 

Hi Joe,

 

could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

 

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <[hidden email]> wrote:

Hi all,

 

I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it.

 

I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the flink kafka consumer like so:

 

   consumer.assignTimestampsAndWatermarks(

            WatermarkStrategy

                .<CustomersUnion>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                .withIdleness(Duration.ofSeconds(10))

        );

 

The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.

 

Is this expected and if it how do you handle this use case?

 

Thanks,

Joe

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Reply | Threaded
Open this post in threaded view
|

Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

Joseph Lorenzini
In reply to this post by Arvid Heise-4

Hi Arvid,

 

I may have figured out the problem.

 

When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window.

 

Does that sound right?

 

Thanks,

Joe

 

From: Arvid Heise <[hidden email]>
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

 

Hi Joe,

 

could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

 

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <[hidden email]> wrote:

Hi all,

 

I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it.

 

I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the flink kafka consumer like so:

 

   consumer.assignTimestampsAndWatermarks(

            WatermarkStrategy

                .<CustomersUnion>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                .withIdleness(Duration.ofSeconds(10))

        );

 

The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.

 

Is this expected and if it how do you handle this use case?

 

Thanks,

Joe

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.
Reply | Threaded
Open this post in threaded view
|

Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

Arvid Heise-4
Hi Joe,

Yes, that is correct! Only when a new record arrives and we know that timestamp, we can deduce the watermark and advance it. The window operator would close the old window and open a new one.

Sorry that I haven't seen that immediately. It's sometimes hard to think in terms of individual records when you are used to think in millions.

On Wed, Jun 9, 2021 at 8:49 PM Joseph Lorenzini <[hidden email]> wrote:

Hi Arvid,

 

I may have figured out the problem.

 

When using a tumbling window on a keyed stream and event time is being used, does time only move forward when there’s an event with a newer timestamp? Said another way: if I have a 5 second tumbling window, the job would need to consume at least two events before a computation would occur: the first event has a timestamp that fits within the 5 second window, the second event has timestamp that exceeds the max timestamp of the previous window.

 

Does that sound right?

 

Thanks,

Joe

 

From: Arvid Heise <[hidden email]>
Date: Wednesday, June 9, 2021 at 8:34 AM
To: Joseph Lorenzini <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Records Are Never Emitted in a Tumbling Event Window When Each Key Only Has One Record

 

Hi Joe,

 

could you please check (in web UI) if the watermark is advancing past the join? The window operator would not trigger if it doesn't advance.

On which Flink version are you running?

 

On Tue, Jun 8, 2021 at 10:13 PM Joseph Lorenzini <[hidden email]> wrote:

Hi all,

 

I have observed behavior joining two keyed streams together, where events are never emitted.  The source of each stream is a different kafka topic. I am curious to know if this expected and if there’s a way to work around it.

 

I am using a tumbling event window. All records across the two kafka topics occurred within the same 5 second window of time. Each kafka topic has a single partition.  For each kafka topic, I configured the flink kafka consumer like so:

 

   consumer.assignTimestampsAndWatermarks(

            WatermarkStrategy

                .<CustomersUnion>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                .withIdleness(Duration.ofSeconds(10))

        );

 

The tumbling window has a duration of 60 seconds. Now it happens to be the case that there is only a single event when joining on a key.  If I use Tumbling Process window then events are emitted as expected. If I actually ensure there are multiple events for a key then the events are also emitted. However, if it’s a single event per key in a tumbling event window then no events are emitted.

 

Is this expected and if it how do you handle this use case?

 

Thanks,

Joe

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.