is streaming outer join sending unnecessary traffic?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

is streaming outer join sending unnecessary traffic?

kant kodali
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant




Test.java (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: is streaming outer join sending unnecessary traffic?

kant kodali
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant



Reply | Threaded
Open this post in threaded view
|

Re: is streaming outer join sending unnecessary traffic?

Till Rohrmann
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali <[hidden email]> wrote:
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant



Reply | Threaded
Open this post in threaded view
|

Re: is streaming outer join sending unnecessary traffic?

kant kodali
Wondering if anyone had a chance to look through this or should I create the JIRA?



On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <[hidden email]> wrote:
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali <[hidden email]> wrote:
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant



Reply | Threaded
Open this post in threaded view
|

Re: is streaming outer join sending unnecessary traffic?

Benchao Li
Hi kant,

Thanks for reporting the issue, I'd like to give some thoughts here after digging into the source code[1] in blink planner, logic is same with legacy planner[2].

The main logic of FULL OUTER JOIN is:
if input record is accumulate
|  if input side is outer
|  |  if there is no matched rows on the other side, send +[record+null], state.add(record, 0)
|  |  if there are matched rows on the other side
|  |  | if other side is outer
|  |  | |  if the matched num in the matched rows == 0, send -[null+other]
|  |  | |  if the matched num in the matched rows > 0, skip
|  |  | |  otherState.update(other, old + 1)
|  |  | endif
|  |  | send +[record+other]s, state.add(record, other.size)
|  |  endif
|  endif
|  if input side not outer
|  |  state.add(record)
|  |  if there is no matched rows on the other side, skip
|  |  if there are matched rows on the other side
|  |  |  if other side is outer
|  |  |  |  if the matched num in the matched rows == 0, send -[null+other]
|  |  |  |  if the matched num in the matched rows > 0, skip
|  |  |  |  otherState.update(other, old + 1)
|  |  |  endif
|  |  |  send +[record+other]s
|  |  endif
|  endif
endif

if input record is retract
|  state.retract(record)
|  if there is no matched rows on the other side
|  | if input side is outer, send -[record+null]
|  endif
|  if there are matched rows on the other side, send -[record+other]s
|  |  if other side is outer
|  |  |  if the matched num in the matched rows == 0, this should never happen!
|  |  |  if the matched num in the matched rows == 1, send +[null+other]
|  |  |  if the matched num in the matched rows > 1, skip
|  |  |  otherState.update(other, old - 1)
|  |  endif
|  endif
endif

For just one Join Operator, the logic above is correct, and deals with all corner cases.
However, for your query, there are three Join Operators:
      Join3
      /    \
   Join2 T4
   /    \
Join1 T3
 /  \
T1 T2

At t4, after sending "flink" to test-topic2:
Join1 will first retract -[flink, null, null, null], then send +[flink, flink, null, null]
Join2 receives -[flink, null, null, null], will send -[flink, null, null, null]
Join2 receives +[flink, flink, null, null], will send -[null, null, flink, null], and +[flink, flink, flink, null]
Join3 receives -[flink, null, null, null], will send -[flink, null, null, null]
Join3 receives -[null, null, flink, null], will send -[null, null, flink, flink], and +[null, null, null, flink]
Join3 receives +[flink, flink, flink, null], will send -[null, null, null, flink], and +[flink, flink, flink, flink]

In my personal opinion, it's normal behavior for current design, and I can't find an easy way to eliminate this duplication.
Let's wait for Jark and Timo's opinions.


kant kodali <[hidden email]> 于2020年2月2日周日 上午5:18写道:
Wondering if anyone had a chance to look through this or should I create the JIRA?



On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <[hidden email]> wrote:
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali <[hidden email]> wrote:
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: is streaming outer join sending unnecessary traffic?

Jark Wu-3
Hi Kant,

Benchao explained the reason in depth, thanks Benchao!

In a word, the results are as expected. That's because all the streaming operators are in per-record manner and eventually consistent.
That means user may see some instantaneous intermediate values. The outer join will generate additional retractions and the nested outer join
will amplify the number of intermediate values.

Currently, blink planner provides mini-batch optimization to reduce the visibility of the intermediate results. However, the mini-batch is only available 
in aggregate operators [1]. The mini-batch optimization for streaming join is on the roadmap, but may not catch up 1.11. 

A workaround for now is that you can customize your sink function, buffer the retraction messages and upsert messages in memory, 
reduce them before flushing to external system.

Best,
Jark



On Sun, 2 Feb 2020 at 11:44, Benchao Li <[hidden email]> wrote:
Hi kant,

Thanks for reporting the issue, I'd like to give some thoughts here after digging into the source code[1] in blink planner, logic is same with legacy planner[2].

The main logic of FULL OUTER JOIN is:
if input record is accumulate
|  if input side is outer
|  |  if there is no matched rows on the other side, send +[record+null], state.add(record, 0)
|  |  if there are matched rows on the other side
|  |  | if other side is outer
|  |  | |  if the matched num in the matched rows == 0, send -[null+other]
|  |  | |  if the matched num in the matched rows > 0, skip
|  |  | |  otherState.update(other, old + 1)
|  |  | endif
|  |  | send +[record+other]s, state.add(record, other.size)
|  |  endif
|  endif
|  if input side not outer
|  |  state.add(record)
|  |  if there is no matched rows on the other side, skip
|  |  if there are matched rows on the other side
|  |  |  if other side is outer
|  |  |  |  if the matched num in the matched rows == 0, send -[null+other]
|  |  |  |  if the matched num in the matched rows > 0, skip
|  |  |  |  otherState.update(other, old + 1)
|  |  |  endif
|  |  |  send +[record+other]s
|  |  endif
|  endif
endif

if input record is retract
|  state.retract(record)
|  if there is no matched rows on the other side
|  | if input side is outer, send -[record+null]
|  endif
|  if there are matched rows on the other side, send -[record+other]s
|  |  if other side is outer
|  |  |  if the matched num in the matched rows == 0, this should never happen!
|  |  |  if the matched num in the matched rows == 1, send +[null+other]
|  |  |  if the matched num in the matched rows > 1, skip
|  |  |  otherState.update(other, old - 1)
|  |  endif
|  endif
endif

For just one Join Operator, the logic above is correct, and deals with all corner cases.
However, for your query, there are three Join Operators:
      Join3
      /    \
   Join2 T4
   /    \
Join1 T3
 /  \
T1 T2

At t4, after sending "flink" to test-topic2:
Join1 will first retract -[flink, null, null, null], then send +[flink, flink, null, null]
Join2 receives -[flink, null, null, null], will send -[flink, null, null, null]
Join2 receives +[flink, flink, null, null], will send -[null, null, flink, null], and +[flink, flink, flink, null]
Join3 receives -[flink, null, null, null], will send -[flink, null, null, null]
Join3 receives -[null, null, flink, null], will send -[null, null, flink, flink], and +[null, null, null, flink]
Join3 receives +[flink, flink, flink, null], will send -[null, null, null, flink], and +[flink, flink, flink, flink]

In my personal opinion, it's normal behavior for current design, and I can't find an easy way to eliminate this duplication.
Let's wait for Jark and Timo's opinions.


kant kodali <[hidden email]> 于2020年2月2日周日 上午5:18写道:
Wondering if anyone had a chance to look through this or should I create the JIRA?



On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <[hidden email]> wrote:
Hi Kant,

I am not an expert on Flink's SQL implementation. Hence, I'm pulling in Timo and Jark who might help you with your question.

Cheers,
Till

On Tue, Jan 28, 2020 at 10:46 PM kant kodali <[hidden email]> wrote:
Sorry. fixed some typos.

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t4 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Assume t1<t2<t3<t4

Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant


On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
Hi All,

I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this 

SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0

And here is how I send messages to those Kafka topics at various times.

At time t1 Send a message "flink" to test-topic1

(true,flink,null,null,null) // Looks good

At time t2 Send a message "flink" to test-topic4

(true,null,null,null,flink) // Looks good

At time t3 Send a message "flink" to test-topic3

(false,null,null,null,flink) // Looks good
(true,null,null,flink,flink) //Looks good

At time t3 Send a message "flink" to test-topic2

(false,flink,null,null,null) // Looks good
(false,null,null,flink,flink) // Looks good
(true,null,null,null,flink) // Redundant?
(false,null,null,null,flink) // Redundant?
(true,flink,flink,flink,flink) //Looks good

Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10.

Attached the code as well.

Thanks!
kant





--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]