Hi All,
I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0 And here is how I send messages to those Kafka topics at various times. At time t1 Send a message "flink" to test-topic1 (true,flink,null,null,null) // Looks good At time t2 Send a message "flink" to test-topic4 (true,null,null,null,flink) // Looks good At time t3 Send a message "flink" to test-topic3 (false,null,null,null,flink) // Looks good (true,null,null,flink,flink) //Looks good At time t3 Send a message "flink" to test-topic2 (false,flink,null,null,null) // Looks good (false,null,null,flink,flink) // Looks good (true,null,null,null,flink) // Redundant? (false,null,null,null,flink) // Redundant? (true,flink,flink,flink,flink) //Looks good Those two rows above seem to be redundant to be although the end result is correct. Doesn't see the same behavior if I join two topics. This unwanted message will lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10. Attached the code as well. Thanks! kant Test.java (3K) Download Attachment |
Sorry. fixed some typos. I am doing a streaming outer join from four topics in Kafka lets call them sample1, sample2, sample3, sample4. Each of these test topics has just one column which is of tuple string. my query is this SELECT * FROM sample1 FULL OUTER JOIN sample2 on sample1.f0=sample2.f0 FULL OUTER JOIN sample3 on sample2.f0=sample3.f0 FULL OUTER JOIN sample4 on sample3.f0=sample4.f0 And here is how I send messages to those Kafka topics at various times. At time t1 Send a message "flink" to test-topic1 (true,flink,null,null,null) // Looks good At time t2 Send a message "flink" to test-topic4 (true,null,null,null,flink) // Looks good At time t3 Send a message "flink" to test-topic3 (false,null,null,null,flink) // Looks good (true,null,null,flink,flink) //Looks good At time t4 Send a message "flink" to test-topic2 (false,flink,null,null,null) // Looks good (false,null,null,flink,flink) // Looks good (true,null,null,null,flink) // Redundant? (false,null,null,null,flink) // Redundant? (true,flink,flink,flink,flink) //Looks good Assume t1<t2<t3<t4 Those two rows above seem to be redundant to me although the end result is correct. Doesn't see the same behavior if I join two topics. These redundant messages can lead to a lot of database operations underneath so any way to optimize this? I am using Flink 1.9 so not sure if this is already fixed in 1.10. Attached the code as well. Thanks! kant On Tue, Jan 28, 2020 at 1:43 PM kant kodali <[hidden email]> wrote:
|
Hi Kant, I am not an expert on Flink's SQL implementation. Hence, I'm pulling in Timo and Jark who might help you with your question. Cheers, Till On Tue, Jan 28, 2020 at 10:46 PM kant kodali <[hidden email]> wrote:
|
Wondering if anyone had a chance to look through this or should I create the JIRA? On Wed, Jan 29, 2020 at 6:49 AM Till Rohrmann <[hidden email]> wrote:
|
Hi kant, Thanks for reporting the issue, I'd like to give some thoughts here after digging into the source code[1] in blink planner, logic is same with legacy planner[2]. The main logic of FULL OUTER JOIN is: if input record is accumulate
| if input side is outer
| | if there is no matched rows on the other side, send +[record+null], state.add(record, 0)
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | endif
| | | send +[record+other]s, state.add(record, other.size)
| | endif
| endif
| if input side not outer
| | state.add(record)
| | if there is no matched rows on the other side, skip
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | endif
| | | send +[record+other]s
| | endif
| endif
endif
if input record is retract
| state.retract(record)
| if there is no matched rows on the other side
| | if input side is outer, send -[record+null]
| endif
| if there are matched rows on the other side, send -[record+other]s
| | if other side is outer
| | | if the matched num in the matched rows == 0, this should never happen!
| | | if the matched num in the matched rows == 1, send +[null+other]
| | | if the matched num in the matched rows > 1, skip
| | | otherState.update(other, old - 1)
| | endif
| endif
endif
For just one Join Operator, the logic above is correct, and deals with all corner cases. However, for your query, there are three Join Operators: Join3 / \ Join2 T4 / \ Join1 T3 / \ T1 T2 At t4, after sending "flink" to test-topic2: Join1 will first retract -[flink, null, null, null], then send +[flink, flink, null, null] Join2 receives -[flink, null, null, null], will send -[flink, null, null, null] Join2 receives +[flink, flink, null, null], will send -[null, null, flink, null], and +[flink, flink, flink, null] Join3 receives -[flink, null, null, null], will send -[flink, null, null, null] Join3 receives -[null, null, flink, null], will send -[null, null, flink, flink], and +[null, null, null, flink] Join3 receives +[flink, flink, flink, null], will send -[null, null, null, flink], and +[flink, flink, flink, flink] In my personal opinion, it's normal behavior for current design, and I can't find an easy way to eliminate this duplication. Let's wait for Jark and Timo's opinions. kant kodali <[hidden email]> 于2020年2月2日周日 上午5:18写道:
Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
Hi Kant, Benchao explained the reason in depth, thanks Benchao! In a word, the results are as expected. That's because all the streaming operators are in per-record manner and eventually consistent. That means user may see some instantaneous intermediate values. The outer join will generate additional retractions and the nested outer join will amplify the number of intermediate values. Currently, blink planner provides mini-batch optimization to reduce the visibility of the intermediate results. However, the mini-batch is only available in aggregate operators [1]. The mini-batch optimization for streaming join is on the roadmap, but may not catch up 1.11. A workaround for now is that you can customize your sink function, buffer the retraction messages and upsert messages in memory, reduce them before flushing to external system. Best, Jark On Sun, 2 Feb 2020 at 11:44, Benchao Li <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |