Flink group with time-windowed join

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink group with time-windowed join

jeremyji
This post was updated on .
Two stream as table1, table2. We know that group with regular join won't work
so we have to use time-windowed join. So here is my flink sql looks like:

SELECT
        a.account account,
        SUM(a.value) + SUM(b.value),
        UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
        (SELECT
                account,
                value,
            producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table2) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)

But i still got error from flink:

Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
        at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
        at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
        at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
        at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
        at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
...


I think i use time-windowed join just like this doc
says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
But flink told me its a regular join. Is there anything wrong i haven't
notice?


--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink group with time-windowed join

Kurt Young
Looks like a bug to me, could you fire an issue for this?

Best,
Kurt


On Thu, Jan 2, 2020 at 9:06 PM jeremyji <[hidden email]> wrote:
Two stream as table1, table2. We know that group with regular join won't work
so we have to use time-windowed join. So here is my flink sql looks like:

*SELECT
        a.account account,
        SUM(a.value) + SUM(b.value),
        UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
        (SELECT
                account,
                value,
            producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table2) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
But i still got error from flink:

/Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
        at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
        at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
        at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
        at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
        at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
..../
I think i use time-windowed join just like this doc
says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
But flink told me its a regular join. Is there anything wrong i haven't
notice?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink group with time-windowed join

Dawid Wysakowicz-2

Hi,

Could you check your query again? I could not reproduce your issue on latest master. I had to adjust your query slightly though:

SELECT
        a.account,
        (SUM(a.`value`) + SUM(b.`value`)) as `result`,
        TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE)
FROM
        (SELECT
                account,
                `value`,
            producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                `value`,
                producer_timestamp
        FROM
                table2) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3' MINUTE AND b.producer_timestamp
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)

Best,

Dawid

On 04/01/2020 04:06, Kurt Young wrote:
Looks like a bug to me, could you fire an issue for this?

Best,
Kurt


On Thu, Jan 2, 2020 at 9:06 PM jeremyji <[hidden email]> wrote:
Two stream as table1, table2. We know that group with regular join won't work
so we have to use time-windowed join. So here is my flink sql looks like:

*SELECT
        a.account account,
        SUM(a.value) + SUM(b.value),
        UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
        (SELECT
                account,
                value,
            producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table2) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
But i still got error from flink:

/Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
        at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
        at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
        at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
        at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
        at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
        at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
..../
I think i use time-windowed join just like this doc
says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
But flink told me its a regular join. Is there anything wrong i haven't
notice?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink group with time-windowed join

jeremyji
Hi Dawid,

I simplified my sql, the original sql is more complex and have a unnest
select like:

*SELECT
        a.account,
        (SUM(a.value) + SUM(b.value)) as result,
        TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE)
FROM
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table1) a,
        (SELECT
                account,
                value,
                producer_timestamp
        FROM
                table2,
                unnest(table2.row_array) as T(account, value) b
WHERE
        a.account = b.account AND
        a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp
group by
        a.account,
        TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*

table2 has a column row_array which is a row array and each row has tow
fields: account and value.
producer_timestamp is time attribute, as a column of table2.
BTW, my Flink version is 1.7.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/