Unexpected behavior from interval join in Flink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Unexpected behavior from interval join in Flink

Wouter Zorgdrager-2
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:
xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }

However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behavior from interval join in Flink

Wouter Zorgdrager-2
Anyone some leads on this issue? Have been looking into the IntervalJoinOperator code, but that didn't really help. My intuition is that it is rejected because of lateness, however that still confuses me since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <[hidden email]>:
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:
xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }

However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behavior from interval join in Flink

Fabian Hueske-2
Hi Wouter,

Not sure what is going wrong there, but something that you could try is to use a custom watemark assigner and always return a watermark of 0.
When the source finished serving the watermarks, it emits a final Long.MAX_VALUE watermark.
Hence the join should consume all events and store them in state. When both sources are finished, it would start to join the data and clean up the state.
This test would show if there are any issue with late data.

Best, Fabian

Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <[hidden email]>:
Anyone some leads on this issue? Have been looking into the IntervalJoinOperator code, but that didn't really help. My intuition is that it is rejected because of lateness, however that still confuses me since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <[hidden email]>:
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:
xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }

However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behavior from interval join in Flink

Wouter Zorgdrager-2
Hi Fabian, 

Thanks for your reply. I managed to resolve this issue. Actually this behavior was not so unexpected, I messed up using xStream as a 'base' while I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= xStream.element <= yStream.element + 30 min. Interchanging both datastreams fixed this issue.

Thanks anyways.

Cheers, Wouter



Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske <[hidden email]>:
Hi Wouter,

Not sure what is going wrong there, but something that you could try is to use a custom watemark assigner and always return a watermark of 0.
When the source finished serving the watermarks, it emits a final Long.MAX_VALUE watermark.
Hence the join should consume all events and store them in state. When both sources are finished, it would start to join the data and clean up the state.
This test would show if there are any issue with late data.

Best, Fabian

Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <[hidden email]>:
Anyone some leads on this issue? Have been looking into the IntervalJoinOperator code, but that didn't really help. My intuition is that it is rejected because of lateness, however that still confuses me since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <[hidden email]>:
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:
xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }

However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behavior from interval join in Flink

Fabian Hueske-2
Ah, that's great!
Thanks for letting us know :-)

Am Mo., 24. Juni 2019 um 11:33 Uhr schrieb Wouter Zorgdrager <[hidden email]>:
Hi Fabian, 

Thanks for your reply. I managed to resolve this issue. Actually this behavior was not so unexpected, I messed up using xStream as a 'base' while I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= xStream.element <= yStream.element + 30 min. Interchanging both datastreams fixed this issue.

Thanks anyways.

Cheers, Wouter



Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske <[hidden email]>:
Hi Wouter,

Not sure what is going wrong there, but something that you could try is to use a custom watemark assigner and always return a watermark of 0.
When the source finished serving the watermarks, it emits a final Long.MAX_VALUE watermark.
Hence the join should consume all events and store them in state. When both sources are finished, it would start to join the data and clean up the state.
This test would show if there are any issue with late data.

Best, Fabian

Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <[hidden email]>:
Anyone some leads on this issue? Have been looking into the IntervalJoinOperator code, but that didn't really help. My intuition is that it is rejected because of lateness, however that still confuses me since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <[hidden email]>:
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:
xStream
      .assignAscendingTimestamps(_.date.getTime)
      .keyBy(_.commonKey)
      .intervalJoin(
        yStream
          .assignAscendingTimestamps(_.date.getTime)
          .keyBy(_.commonKey))
      .between(Time.minutes(-60), Time.minutes(30))
      .process(new ProcessJoinFunction[X, Y, String] {
        override def processElement(
            left: X,
            right: Y,
            ctx: ProcessJoinFunction[X, Y, String]#Context,
            out: Collector[String]): Unit = {

          out.collect(left + ":" + right)
        }

However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data?

Thx a lot,
Wouter