Duplicates in self join

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Duplicates in self join

Eric L Goodman
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Dominik Wosiński
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Hequn Cheng
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <[hidden email]> wrote:
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Eric L Goodman
If I change it to a Tumbling window some of the results will be lost since the pattern I'm matching has a temporal extent, so if the pattern starts in one tumbling window and ends in the next, it won't be reported.  Based on the temporal length of the query, you can set the sliding window and the window lengths to capture all the patterns, though as you note, you will get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <[hidden email]> wrote:
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <[hidden email]> wrote:
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Fabian Hueske-2
Did you check the new interval join that was added with Flink 1.6.0 [1]?
It might be better suited because, each record has its own boundaries based on its timestamp and the join window interval.

Best,
Fabian


Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <[hidden email]>:
If I change it to a Tumbling window some of the results will be lost since the pattern I'm matching has a temporal extent, so if the pattern starts in one tumbling window and ends in the next, it won't be reported.  Based on the temporal length of the query, you can set the sliding window and the window lengths to capture all the patterns, though as you note, you will get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <[hidden email]> wrote:
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <[hidden email]> wrote:
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Eric L Goodman
In reply to this post by Hequn Cheng
When I switched to using TumblingEventTimeWindows, it did remove the duplicates, which was somewhat surprising because with just 10 edges (.1 seconds in length), it should have fit within one window of the SlidingEventTimeWindows (20 seconds window, 10 second slide).

On Mon, Oct 8, 2018 at 9:02 AM Hequn Cheng <[hidden email]> wrote:
Hi, 

I just want to verify my assumption that the duplicates are introduced by the sliding window instead of the join. When perform a Sliding window, a message can belong to  multi windows and the message will be joined multi times. 
If my assumption is correct, you can add a ProcessFunction after the join to do distinct.

Best, Hequn

On Mon, Oct 8, 2018 at 10:37 PM Eric L Goodman <[hidden email]> wrote:
If I change it to a Tumbling window some of the results will be lost since the pattern I'm matching has a temporal extent, so if the pattern starts in one tumbling window and ends in the next, it won't be reported.  Based on the temporal length of the query, you can set the sliding window and the window lengths to capture all the patterns, though as you note, you will get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <[hidden email]> wrote:
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <[hidden email]> wrote:
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion. 




Reply | Threaded
Open this post in threaded view
|

Re: Duplicates in self join

Eric L Goodman
In reply to this post by Fabian Hueske-2
Interval join is exactly what I'm looking for.  Thanks for pointing it out!

On Mon, Oct 8, 2018 at 9:13 AM Fabian Hueske <[hidden email]> wrote:
Did you check the new interval join that was added with Flink 1.6.0 [1]?
It might be better suited because, each record has its own boundaries based on its timestamp and the join window interval.

Best,
Fabian


Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <[hidden email]>:
If I change it to a Tumbling window some of the results will be lost since the pattern I'm matching has a temporal extent, so if the pattern starts in one tumbling window and ends in the next, it won't be reported.  Based on the temporal length of the query, you can set the sliding window and the window lengths to capture all the patterns, though as you note, you will get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng <[hidden email]> wrote:
Hi Eric,

Can you change Sliding window to Tumbling window? The data of different sliding window are likely overlap.

Best, Hequn

On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński <[hidden email]> wrote:
Hey, 
IMHO, the simplest way in your case would be to use the Evictor to evict duplicate values after the window is generated. Have look at it here: https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html

Best Regards,
Dominik.

pon., 8 paź 2018 o 08:00 Eric L Goodman <[hidden email]> napisał(a):
What is the best way to avoid or remove duplicates when joining a stream with itself?  I'm performing a streaming temporal triangle computation and the first part is to find triads of two edges of the form vertexA->vertexB and vertexB->vertexC (and there are temporal constraints where the first edge occurs before the second edge).  To do that, I have the following code:
DataStream<Triad> triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));
However, when I look at the triads being built, there are two copies of each triad.
For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2, targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of "edges" with itself.
I can provide more code if that would be helpful, but I believe I've captured the most salient portion.