We have Avro schema that contains nested structure and when querying using Flink SQL, we are getting below error.
Exception in thread "main" java.lang.AssertionError at org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236) at org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226) at org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) Example Schema: ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>> Example SQL: insert into CSVSink select col1, postalAddress.addressLine1 as address from myStream In Flink SQL, How to select nested elements ? |
Hi,Ramana
For nested data type, Flink use dot (eg a.b.c) to visit nested elements. Your SQL syntax looks right, which Flink version are you using? And could you post your Avro Schema file and DDL ? Best, Leonard Xu > 在 2020年6月5日,03:34,Ramana Uppala <[hidden email]> 写道: > > We have Avro schema that contains nested structure and when querying using Flink SQL, we are getting below error. > > Exception in thread "main" java.lang.AssertionError > at org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236) > at org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226) > at org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232) > at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416) > at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733) > at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > > Example Schema: > ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>> > > Example SQL: > insert into CSVSink > select > col1, > postalAddress.addressLine1 as address > from myStream > > In Flink SQL, How to select nested elements ? > |
Hi Leonard, We are using Flink 1.10 version and I can not share the complete schema but it looks like below in Hive Catalog, flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>> Based on the stack trace, sqlUpdate API validates the sql statement and throwing the above error. Do we need to configure any Calcite configuration to support nested types ? Thanks, Ramana. On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu <[hidden email]> wrote: Hi,Ramana |
Hi Ramana, Could you help us with a way to reproduce the behaviour? I could not reproduce it locally. The code below works for me just fine:
Best, Dawid On 05/06/2020 13:59, Ramana Uppala
wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid,
This issue has been resolved. From our debugging we found out that Calcite parser was able to resolve the nested elements as expected. But, expecting case to match with the schema. Our SQL select field case and schema field case was not matching in this scenario. After fixing sql to have the correct case, query worked as expected. Is Flink SQL case is case sensitive ? We don't see any documentation related to this. It will be great if we can convert all query elements to lower case similar to Hive. On 2020/06/09 07:58:20, Dawid Wysakowicz <[hidden email]> wrote: > Hi Ramana, > > Could you help us with a way to reproduce the behaviour? I could not > reproduce it locally. The code below works for me just fine: > > |StreamExecutionEnvironment exec = > StreamExecutionEnvironment.getExecutionEnvironment();|| > ||StreamTableEnvironment tEnv = StreamTableEnvironment.create(|| > || exec,|| > || > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());|| > ||tEnv.registerTableSource(|| > || "T",|| > || new StreamTableSource<Row>() {|| > || @Override|| > || public TableSchema getTableSchema() {|| > || return TableSchema.builder()|| > || .field("f3", > DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING())))|| > || .build();|| > || }|| > || @Override|| > || public DataStream<Row> > getDataStream(StreamExecutionEnvironment execEnv) {|| > || return execEnv.fromCollection(|| > || Arrays.asList(Row.of(Row.of("ABCDE")))|| > || );|| > || }|| > || @Override|| > || public DataType getProducedDataType() {|| > || return DataTypes.ROW(|| > || DataTypes.FIELD(|| > || "f3",|| > || > DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))|| > || )|| > || );|| > || }|| > || });|| > ||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");|| > ||DataStream<Row> result = tEnv.toAppendStream(|| > || table,|| > || Types.ROW(Types.STRING()));|| > ||result.print();|| > ||exec.execute();| > > Best, > > Dawid > > On 05/06/2020 13:59, Ramana Uppala wrote: > > Hi Leonard, > > > > We are using Flink 1.10 version and I can not share the complete > > schema but it looks like below in Hive Catalog, > > > > flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647), > > `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` > > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>> > > > > Based on the stack trace, sqlUpdate API validates the sql statement > > and throwing the above error. Do we need to configure any Calcite > > configuration to support nested types ? > > > > Thanks, > > Ramana. > > > > On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu <[hidden email] > > <mailto:[hidden email]>> wrote: > > > > Hi,Ramana > > > > For nested data type, Flink use dot (eg a.b.c) to visit nested > > elements. Your SQL syntax looks right, which Flink version are you > > using? And could you post your Avro Schema file and DDL ? > > > > Best, > > Leonard Xu > > > > > 在 2020年6月5日,03:34,Ramana Uppala <[hidden email] > > <mailto:[hidden email]>> 写道: > > > > > > We have Avro schema that contains nested structure and when > > querying using Flink SQL, we are getting below error. > > > > > > Exception in thread "main" java.lang.AssertionError > > > at > > org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236) > > > at > > org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226) > > > at > > org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232) > > > at > > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416) > > > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733) > > > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718) > > > at > > org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > > > > > > Example Schema: > > > ROW<`col1` VARCHAR(2147483647), `postalAddress` > > ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` > > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>> > > > > > > Example SQL: > > > insert into CSVSink > > > select > > > col1, > > > postalAddress.addressLine1 as address > > > from myStream > > > > > > In Flink SQL, How to select nested elements ? > > > > > > > ------------------------------------------------------------------------ > > > > > > The information contained in this e-mail is confidential and/or > > proprietary to Capital One and/or its affiliates and may only be used > > solely in performance of work or services for Capital One. The > > information transmitted herewith is intended only for use by the > > individual or entity to which it is addressed. If the reader of this > > message is not the intended recipient, you are hereby notified that > > any review, retransmission, dissemination, distribution, copying or > > other use of, or taking of any action in reliance upon this > > information is strictly prohibited. If you have received this > > communication in error, please contact the sender and delete the > > material from your computer. > > > > > |
Hi, Ramna
Happy to hear you’ve resolved your problem, if you could post your SQL maybe this question can get quicker response. Flink SQL is case sensitive default and there had an issue to track[1], I think it makes sense to add some specification in SQL section of docs. Best, Leonard Xu
|
Free forum by Nabble | Edit this page |