Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

classic Classic list List threaded Threaded
4 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

Dan
Hi!

I'm using Flink SQL to do an interval join.  Rows in one of the tables are not unique.  I'm fine using either the first or last row.  When I try to deduplicate and then interval join, I get the following error. 

IntervalJoin doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id, log_user_id, client_log_ts, event_api_ts, ts])


Is there a way to combine these in this order?  I could do the deduplication afterwards but this will result in more state.

- Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

Arvid Heise-3
Hi Dan,

Which Flink version are you using? I know that there has been quite a bit of optimization of deduplication in 1.12, which would reduce the required state tremendously.
I'm pulling in Jark who knows more.

On Thu, Dec 31, 2020 at 6:54 AM Dan Hill <[hidden email]> wrote:
Hi!

I'm using Flink SQL to do an interval join.  Rows in one of the tables are not unique.  I'm fine using either the first or last row.  When I try to deduplicate and then interval join, I get the following error. 

IntervalJoin doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id, log_user_id, client_log_ts, event_api_ts, ts])


Is there a way to combine these in this order?  I could do the deduplication afterwards but this will result in more state.

- Dan


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

Jark Wu-3
Hi Dan,

Sorry for the late reply. 

I guess you applied a "deduplication with keeping last row" before the interval join? 
That will produce an updating stream and interval join only supports append-only input. 
You can try to apply "deduplication with keeping *first* row" before the interval join. 
That should produce an append-only stream and interval join can consume from it. 

Best,
Jark



On Tue, 5 Jan 2021 at 20:07, Arvid Heise <[hidden email]> wrote:
Hi Dan,

Which Flink version are you using? I know that there has been quite a bit of optimization of deduplication in 1.12, which would reduce the required state tremendously.
I'm pulling in Jark who knows more.

On Thu, Dec 31, 2020 at 6:54 AM Dan Hill <[hidden email]> wrote:
Hi!

I'm using Flink SQL to do an interval join.  Rows in one of the tables are not unique.  I'm fine using either the first or last row.  When I try to deduplicate and then interval join, I get the following error. 

IntervalJoin doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id, log_user_id, client_log_ts, event_api_ts, ts])


Is there a way to combine these in this order?  I could do the deduplication afterwards but this will result in more state.

- Dan


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

Dan
Hey, sorry for the late reply.  I'm using v1.11.1.

Cool.  I did a non-SQL way of using the first row.  I'll try to see if I can do this in the SQL version.

On Wed, Jan 13, 2021 at 11:26 PM Jark Wu <[hidden email]> wrote:
Hi Dan,

Sorry for the late reply. 

I guess you applied a "deduplication with keeping last row" before the interval join? 
That will produce an updating stream and interval join only supports append-only input. 
You can try to apply "deduplication with keeping *first* row" before the interval join. 
That should produce an append-only stream and interval join can consume from it. 

Best,
Jark



On Tue, 5 Jan 2021 at 20:07, Arvid Heise <[hidden email]> wrote:
Hi Dan,

Which Flink version are you using? I know that there has been quite a bit of optimization of deduplication in 1.12, which would reduce the required state tremendously.
I'm pulling in Jark who knows more.

On Thu, Dec 31, 2020 at 6:54 AM Dan Hill <[hidden email]> wrote:
Hi!

I'm using Flink SQL to do an interval join.  Rows in one of the tables are not unique.  I'm fine using either the first or last row.  When I try to deduplicate and then interval join, I get the following error. 

IntervalJoin doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id, log_user_id, client_log_ts, event_api_ts, ts])


Is there a way to combine these in this order?  I could do the deduplication afterwards but this will result in more state.

- Dan


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng