Hello folks,
I am looking to enrich rows from an unbounded streaming table by joining it with a bounded static table while preserving rowtime for the streaming table. For example, let's consider table two tables F and D, where F is unbounded and D is bounded. The schema for the two tables is the following - F: |-- C0: BIGINT |-- C1: STRING |-- R: TIMESTAMP(3) **rowtime** |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS D: |-- C0: BIGINT |-- C1: STRING NOT NULL I'd like to run the following query on this schema - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) However, I run into the following error while running the above query - "Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before." My understanding reading the docs is that Time Temporal Join is meant to solve this problem. So I model table D as the following - D: |-- C0: BIGINT |-- C1: STRING NOT NULL |-- R: TIMESTAMP(3) |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1) With column D.R always set to 0 and modify the query as follows - select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 group by D.C1, tumble(F.R, interval '1' second) The above query runs but does not return any result. I have the following data in D initially - Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 Emit D watermark=0 And F streams the following rows - Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 Emit F watermark=1000 I expect that two rows in F will join with matching rows (on C1) in D and produce some output. But I do not see anything in the output. So I have the following questions - 1. Is time temporal join the correct tool to solve this problem? 2. What could be the reason for not getting any output rows in the result? Thanks, Satyam |
Hello folks, I would love to hear back your feedback on this. Regards, Satyam On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <[hidden email]> wrote:
|
Hi Satyam,
first of all your initial join query can also work, you just need to make sure that no time attribute is in the SELECT clause. As the exception indicates, you need to cast all time attributes to TIMESTAMP. The reason for this is some major design issue that is also explained here where a time attribute must not be in the output of a regular join: https://stackoverflow.com/a/64500296/806430 However, since you would like to perform the join "time-based" either interval join or temporal join might solve your use cases. In your case I guess the watermark strategy of D is the problem. Are you sure the result is: > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > Emit D watermark=0 and not: > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D watermark=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 Or maybe the watermark is even dropped. Could you try to use a watermark strategy with `R` - INTERVAL '0.001' SECONDS I hope this helps. Regards, Timo On 16.03.21 04:37, Satyam Shekhar wrote: > Hello folks, > > I would love to hear back your feedback on this. > > Regards, > Satyam > > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar <[hidden email] > <mailto:[hidden email]>> wrote: > > Hello folks, > > I am looking to enrich rows from an unbounded streaming table by > joining it with a bounded static table while preserving rowtime for > the streaming table. For example, let's consider table two tables F > and D, where F is unbounded and D is bounded. The schema for the two > tables is the following - > > F: > |-- C0: BIGINT > |-- C1: STRING > |-- R: TIMESTAMP(3) **rowtime** > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > > D: > |-- C0: BIGINT > |-- C1: STRING NOT NULL > > I'd like to run the following query on this schema - > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > from F join D ON F.C1 = D.C1 > group by D.C1, tumble(F.R, interval '1' second) > > However, I run into the following error while running the above query - > > "Rowtime attributes must not be in the input rows of a regular join. > As a workaround you can cast the time attributes of input tables to > TIMESTAMP before." > > My understanding reading the docs is that Time Temporal Join is > meant to solve this problem. So I model table D as the following - > > D: > |-- C0: BIGINT > |-- C1: STRING NOT NULL > |-- R: TIMESTAMP(3) > |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS > |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1) > > With column D.R always set to 0 and modify the query as follows - > > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second) > from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 > group by D.C1, tumble(F.R, interval '1' second) > > The above query runs but does not return any result. I have the > following data in D initially - > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0 > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0 > Emit D watermark=0 > > And F streams the following rows - > Emit F row=+I(0,"0",1970-01-01T00:00)@time=0 > Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000 > Emit F watermark=1000 > > I expect that two rows in F will join with matching rows (on C1) in > D and produce some output. But I do not see anything in the output. > > So I have the following questions - > > 1. Is time temporal join the correct tool to solve this problem? > 2. What could be the reason for not getting any output rows in the > result? > > Thanks, > Satyam > |
Hi Timo, Apologies for the late response. I somehow seem to have missed your reply. I do want the join to be "time-based" since I need to perform a tumble grouping operation on top of the join. I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS, that didn't help either. Note that we have a custom connector to an internal storage engine. The connector implements ScanTableSource interface with SupportsWatermarkPushDown ability. Would the watermark strategy in the table schema matter in that case? I changed the query to the following to simplify further - select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1 I still do not see any output from the pipeline. The overall logs I see from the connecter is the following - Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0 --> ctx.collectWithTimestamp(row_, rowtime); Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0 Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000 Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0 Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000 Emit D.F wm=4000 ---> ctx.emitWatermark(new Watermark(wm)); Emit D.D wm=0 Now, if I change the rowtime of table D to 1s instead of 0, I get one row as output. Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000 Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0 Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000 Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000 Emit D.F wm=1000 Emit D.D wm=1000 reply: (1, "1", 1000, 1, "1", 1000) The next row streamed from F which should join with a row emitted from D does not emit any output - Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000 Emit D.F wm=2000 NO REPLY My understanding of temporal joins is that the latest row from D should be picked for joining rows from F. Is my expectation that the (2, 2, 2s) in F join with (2, 2, 1s) row in D wrong? Regards, Satyam On Tue, Mar 16, 2021 at 5:54 AM Timo Walther <[hidden email]> wrote: Hi Satyam, |
Free forum by Nabble | Edit this page |