TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Dawid Wysakowicz-2
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
Thanks Dawid! Can you please point me to a jira which tracked the fix?  

Thanks!
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Dawid Wysakowicz-2

On Thu, 28 Mar 2019, 20:48 Vinod Mehra, <[hidden email]> wrote:
Thanks Dawid! Can you please point me to a jira which tracked the fix?  

Thanks!
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
Doh! Sorry about that! :)

Thanks again!

On Thu, Mar 28, 2019 at 12:49 PM Dawid Wysakowicz <[hidden email]> wrote:

On Thu, 28 Mar 2019, 20:48 Vinod Mehra, <[hidden email]> wrote:
Thanks Dawid! Can you please point me to a jira which tracked the fix?  

Thanks!
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
In reply to this post by Dawid Wysakowicz-2
Dawid, 

After the above change my SQL (that uses TUMBLE(rowtime, INTERVAL '1' MONTH)) fails with an error now:

(testing with org.apache.flink:flink-table_2.11:jar:1.7.1:compile now)
org.apache.flink.table.api.TableException: Only constant window intervals with millisecond resolution are supported.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:73)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)

The same exact syntax works fine for DAY intervals. For example:
TUMBLE(rowtime, INTERVAL '30' DAY)

Is the same syntax for MONTH / YEAR intervals not supported?
TUMBLE(rowtime, INTERVAL '1' MONTH)
TUMBLE(rowtime, INTERVAL '1' YEAR)

Thanks,
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
btw the max DAY window that is allowed is 99 days. After that it blows up here: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/SqlIntervalQualifier.java#L371  
"SQL validation failed. From line 12, column 19 to line 12, column 36: Interval field value 100 exceeds precision of DAY(2) field"

Resetting things based on larger windows (month, quarter, year) can be quite useful. Is there a practical limitation with Flink (state size blows up?) for not supporting such large windows?

- Vinod


On Thu, Mar 28, 2019 at 3:24 PM Vinod Mehra <[hidden email]> wrote:
Dawid, 

After the above change my SQL (that uses TUMBLE(rowtime, INTERVAL '1' MONTH)) fails with an error now:

(testing with org.apache.flink:flink-table_2.11:jar:1.7.1:compile now)
org.apache.flink.table.api.TableException: Only constant window intervals with millisecond resolution are supported.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:73)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)

The same exact syntax works fine for DAY intervals. For example:
TUMBLE(rowtime, INTERVAL '30' DAY)

Is the same syntax for MONTH / YEAR intervals not supported?
TUMBLE(rowtime, INTERVAL '1' MONTH)
TUMBLE(rowtime, INTERVAL '1' YEAR)

Thanks,
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod

Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE function with MONTH/YEAR intervals produce milliseconds window unexpectedly

Vinod Mehra
Hi Dawid! 

I have filed a bug for a related issue to this thread: https://jira.apache.org/jira/browse/FLINK-12105 (TUMBLE INTERVAL value errors out for 100 or more value).

How should we go about supporting MONTH and YEAR? If you have ideas please let me know, I will be happy to work with you to fix it. I know there are complexities around timezone and months/years of variable sizes. 

Can we just tumble the windows based on dates? Same date every month/year? For example, current timestamp is April 02 2019 (01:34:00.000 PST). If the tumble window is x MONTHs, can we switch to a new window every x months on 2nd day at 01:34:00.000 PST? Same thing with years.

Thanks,
Vinod

On Thu, Mar 28, 2019 at 5:02 PM Vinod Mehra <[hidden email]> wrote:
btw the max DAY window that is allowed is 99 days. After that it blows up here: https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/SqlIntervalQualifier.java#L371  
"SQL validation failed. From line 12, column 19 to line 12, column 36: Interval field value 100 exceeds precision of DAY(2) field"

Resetting things based on larger windows (month, quarter, year) can be quite useful. Is there a practical limitation with Flink (state size blows up?) for not supporting such large windows?

- Vinod


On Thu, Mar 28, 2019 at 3:24 PM Vinod Mehra <[hidden email]> wrote:
Dawid, 

After the above change my SQL (that uses TUMBLE(rowtime, INTERVAL '1' MONTH)) fails with an error now:

(testing with org.apache.flink:flink-table_2.11:jar:1.7.1:compile now)
org.apache.flink.table.api.TableException: Only constant window intervals with millisecond resolution are supported.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:73)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)

The same exact syntax works fine for DAY intervals. For example:
TUMBLE(rowtime, INTERVAL '30' DAY)

Is the same syntax for MONTH / YEAR intervals not supported?
TUMBLE(rowtime, INTERVAL '1' MONTH)
TUMBLE(rowtime, INTERVAL '1' YEAR)

Thanks,
Vinod

On Thu, Mar 28, 2019 at 12:46 PM Dawid Wysakowicz <[hidden email]> wrote:
It should be fixed since version 1.6.3.
Best,


On Thu, 28 Mar 2019, 19:32 Vinod Mehra, <[hidden email]> wrote:
Hi All!

We are using: org.apache.flink:flink-table_2.11:jar:1.4.2:compile

SELECT
  COALESCE(user_id, -1) AS user_id,
  count(id) AS count_per_window,
  sum(amount) AS charge_amount_per_window,
  TUMBLE_START(rowtime, INTERVAL '2' YEAR) AS twindow_start,
  TUMBLE_END(rowtime, INTERVAL '2' YEAR) AS twindow_end
FROM
  event_charge_processed
WHERE capture=true
AND COALESCE(user_id, -1) <> -1
GROUP BY
  TUMBLE(rowtime, INTERVAL '2' YEAR),
  COALESCE(user_id, -1)

For '1' MONTH intervals it results in 1ms windows, 2 MONTH=2ms, 3 MONTH=3ms …. 1 YEAR=12ms, 2 YEAR=24ms! Which results in incorrect aggregations.

I found that org.apache.calcite.sql.SqlLiteral#getValueAs() treats MONTH/YEAR differently than DAY/HOUR etc. Perhaps the bug is somewhere there (?).

Is this a known issue? Has it been fixed in later versions?

Thanks,
Vinod