回复: subuquery about flink sql

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

回复: subuquery about flink sql

韩宁宁
Thank you for your reply.

I think the table registration no problem。I guess it's a question of subquery。
It's no problem to execute this SQL:
select 
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
 from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND) 



------------------ 原始邮件 ------------------
发件人: "李玥"<[hidden email]>;
发送时间: 2018年4月3日(星期二) 中午11:49
收件人: "韩宁宁"<[hidden email]>;
抄送: "user"<[hidden email]>; "skycrab68"<[hidden email]>;
主题: Re: subuquery about flink sql

The exception logs tells that your table “myFlinkTable” does not contain a column/field named “t”. Could be something  wrong  about your table registration.  It would be helpful to show us your table registration code,  like:

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);

   


在 2018年4月3日,上午11:23,韩宁宁 <[hidden email]> 写道:

Deal All
      I have a question about subquery of flink sql.
      My sql like this:
      select
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
    from (
        select
            distinct(user),
            product,
            amount,
            actionTime
        from myFlinkTable
    ) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)

    The typeinfo of field like this:
    TypeInformation<Row> typeInfo = Types.ROW(
                new String[] {"user","product","amount","actionTime"},
                new TypeInformation<?>[] {
                        BasicTypeInfo.STRING_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.LONG_TYPE_INFO,
                }
        );

    My datasource implements DefinedRowtimeAttribute,as follows:
    @Override
    public String getRowtimeAttribute() {
        return "t";
    }

    I run the test code,and get the following error.
    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. At line 13, column 24: Column 't' not found in any table
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
at com.didi.flink.sql.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.calcite.runtime.CalciteContextException: At line 13, column 24: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:4624)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3529)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3172)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 32 more
==========================================
How to solve this problem?

Best wishes
Thanks




Reply | Threaded
Open this post in threaded view
|

Re: subuquery about flink sql

杨力
You should add a column
    TUMBLE_ROWTIME(t, INTERVAL '60' SECOND) AS t 
to the select part of your subquery.

韩宁宁 <[hidden email]> 于 2018年4月3日周二 下午3:34写道:
Thank you for your reply.

I think the table registration no problem。I guess it's a question of subquery。
It's no problem to execute this SQL:
select 
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
 from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND) 



------------------ 原始邮件 ------------------
发件人: "李玥"<[hidden email]>;
发送时间: 2018年4月3日(星期二) 中午11:49
收件人: "韩宁宁"<[hidden email]>;
抄送: "user"<[hidden email]>; "skycrab68"<[hidden email]>;
主题: Re: subuquery about flink sql

The exception logs tells that your table “myFlinkTable” does not contain a column/field named “t”. Could be something  wrong  about your table registration.  It would be helpful to show us your table registration code,  like:

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);

   


在 2018年4月3日,上午11:23,韩宁宁 <[hidden email]> 写道:

Deal All
      I have a question about subquery of flink sql.
      My sql like this:
      select
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
    from (
        select
            distinct(user),
            product,
            amount,
            actionTime
        from myFlinkTable
    ) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)

    The typeinfo of field like this:
    TypeInformation<Row> typeInfo = Types.ROW(
                new String[] {"user","product","amount","actionTime"},
                new TypeInformation<?>[] {
                        BasicTypeInfo.STRING_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.LONG_TYPE_INFO,
                }
        );

    My datasource implements DefinedRowtimeAttribute,as follows:
    @Override
    public String getRowtimeAttribute() {
        return "t";
    }

    I run the test code,and get the following error.
    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. At line 13, column 24: Column 't' not found in any table
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
at com.didi.flink.sql.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.calcite.runtime.CalciteContextException: At line 13, column 24: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:4624)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3529)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3172)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 32 more
==========================================
How to solve this problem?

Best wishes
Thanks




Reply | Threaded
Open this post in threaded view
|

Re: subuquery about flink sql

Timo Walther
Hi,

there are multiple issues in your query. First of all, "SELECT DISTINCT(user), product" is MySQL specific syntax and is interpreted as "SELECT DISTINCT user, product" which is not what you want I guess. Secondly, SQL windows can only be applied on time attributes. Meaning:

"As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink’s time and watermarking system and thus can not be used for time-based operations anymore." [1]

So you cannot get a time attribute out of an aggregation (distinct). Could you do the windowing first and maybe do the DISTINCT afterwards?

Some hint for while developing: You can always output the current schema e.g. tableEnv.sql("...").printSchema();

Regards,
Timo


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes


Am 03.04.18 um 10:15 schrieb 杨力:
You should add a column
    TUMBLE_ROWTIME(t, INTERVAL '60' SECOND) AS t 
to the select part of your subquery.

韩宁宁 <[hidden email]> 于 2018年4月3日周二 下午3:34写道:
Thank you for your reply.

I think the table registration no problem。I guess it's a question of subquery。
It's no problem to execute this SQL:
select 
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
 from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND) 



------------------ 原始邮件 ------------------
发件人: "李玥"<[hidden email]>;
发送时间: 2018年4月3日(星期二) 中午11:49
收件人: "韩宁宁"<[hidden email]>;
抄送: "user"<[hidden email]>; "skycrab68"<[hidden email]>;
主题: Re: subuquery about flink sql

The exception logs tells that your table “myFlinkTable” does not contain a column/field named “t”. Could be something  wrong  about your table registration.  It would be helpful to show us your table registration code,  like:

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);

   


在 2018年4月3日,上午11:23,韩宁宁 <[hidden email]> 写道:

Deal All
      I have a question about subquery of flink sql.
      My sql like this:
      select
        user,
        count(product),
        TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
        TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
    from (
        select
            distinct(user),
            product,
            amount,
            actionTime
        from myFlinkTable
    ) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)

    The typeinfo of field like this:
    TypeInformation<Row> typeInfo = Types.ROW(
                new String[] {"user","product","amount","actionTime"},
                new TypeInformation<?>[] {
                        BasicTypeInfo.STRING_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.LONG_TYPE_INFO,
                }
        );

    My datasource implements DefinedRowtimeAttribute,as follows:
    @Override
    public String getRowtimeAttribute() {
        return "t";
    }

    I run the test code,and get the following error.
    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. At line 13, column 24: Column 't' not found in any table
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
at com.didi.flink.sql.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.calcite.runtime.CalciteContextException: At line 13, column 24: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:4624)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3529)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3172)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 32 more
==========================================
How to solve this problem?

Best wishes
Thanks