Hey,
I need some help regarding the sql and table api. I'm using Apache Flink 1.3.2 with scala 2.11.11 (also tried 2.11.8) and I created a DataStream (based on a scala case class) and I registered this as a table. The case class includes some Lists, because the underlying JSON has some Arrays in the schema. I had a look at the SQL API documentation and there is an example with the CROSS JOIN and UNNEST function, so I tried the following example SQL query: "SELECT t.item.* FROM product, UNNEST(entity.items) AS t (item)". When I run the stream I get the following error: "Exception in thread "main" java.lang.AssertionError: Cycle detected during type-checking at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:93) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122) at org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:71) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) at org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) at org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:325) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2765) at org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:324) at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4603) at org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:52) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2941) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2926) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2963) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2935) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3168) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88) at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499) ...." Do you have any idea about this? The stream works when I do a simple select without doing the unnest etc.. Is there also a way to do the unnest via the Table API? I haven't found an example or an operator for this. Thanks in advance! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
thanks for writing on the mailling list. I could reproduce your error and opened an issue for it (https://issues.apache.org/jira/browse/FLINK-8107). UNNEST currently only supports unnesting and joining an array of the same relation. However joining of two relations will be supported soon (window joins in 1.4 and full joins in 1.5). I don't know your exact use case maybe you can explain a bit more what you want to do? Otherwise it might be worth looking into user-defined table functions because they can also be quite useful to unroll arrays. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html#table-functions I hope that helps. Regards, Timo Am 11/19/17 um 6:59 PM schrieb Lothium: > Hey, > I need some help regarding the sql and table api. I'm using Apache Flink > 1.3.2 with scala 2.11.11 (also tried 2.11.8) and I created a DataStream > (based on a scala case class) and I registered this as a table. The case > class includes some Lists, because the underlying JSON has some Arrays in > the schema. I had a look at the SQL API documentation and there is an > example with the CROSS JOIN and UNNEST function, so I tried the following > example SQL query: > "SELECT t.item.* FROM product, UNNEST(entity.items) AS t (item)". > > When I run the stream I get the following error: > "Exception in thread "main" java.lang.AssertionError: Cycle detected during > type-checking > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:93) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:71) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:325) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2765) > at > org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:324) > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4603) > at > org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:52) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2941) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2926) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2963) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2935) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3168) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88) > at > org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499) > ...." > > Do you have any idea about this? The stream works when I do a simple select > without doing the unnest etc.. Is there also a way to do the unnest via the > Table API? I haven't found an example or an operator for this. > > Thanks in advance! > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hey Timo,
thanks for your warm welcome and for creating a ticket to fix this! My scenario is the following: I receive different JSON entities from an AMQP queue. I have a source to collect the events, after that I parse them into the different internal case classes and split the stream via the split function by the entity type. After that I want to transform the different entities and write the new JSON format (per entity type) to a file sink. I know I can do the transformation of the case classes to other case classes via scala code, but I thought it would be sometimes easier to do the transformations and also aggregations via the SQL or Table-API, but there are sometimes JSON arrays with objects in there and this seems to be the problem currently. I will have a look at the UDFs, maybe I can write something for my purpose. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Actually, your use case should be doable with Flink's Table & SQL API
with some additional UDFs. The API can handle JSON objects if they are valid composite types and you can access arrays as well. The splitting might be a bit tricky in SQL, you could model it simply as a where() clause or maybe a groupBy(). If you experience additional limitations or problems, let us know about it. Good luck. Regards, Timo Am 11/20/17 um 9:16 PM schrieb Lothium: > Hey Timo, > thanks for your warm welcome and for creating a ticket to fix this! > My scenario is the following: > I receive different JSON entities from an AMQP queue. I have a source to > collect the events, after that I parse them into the different internal case > classes and split the stream via the split function by the entity type. > After that I want to transform the different entities and write the new JSON > format (per entity type) to a file sink. > I know I can do the transformation of the case classes to other case classes > via scala code, but I thought it would be sometimes easier to do the > transformations and also aggregations via the SQL or Table-API, but there > are sometimes JSON arrays with objects in there and this seems to be the > problem currently. > I will have a look at the UDFs, maybe I can write something for my purpose. > > Thanks! > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hey Timo,
when I try to access the array in the case class via the SQL syntax, I get back an error that the syntax is invalid. Here is an example of the case class structure: case class Envelope(name: String, entity: Product) case class Product(name: String, items: List[Item]) case class Item(attr1: String, attr2: String, attr3: Option[String]) The input stream is of type "Envelope" and I try to do the following query: "SELECT entity.items[1].attr1 FROM product". Querying the name of the product is not a problem for example. Is this really an invalid syntax? I had a look at the documentation and I thought it would work like this. I also tried to create a UDF to unnest the array, the function looks like this: class UnnestArray() extends TableFunction[Item] { def eval(product: Product): Unit = { product.items.foreach(f => collect(f)) } } When I use this with the "LATERAL TABLE" expression, I get the following error: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 55 to line 1, column 75: No match found for function signature unnest_array(<COMPOSITE<my case class definition> Do you have any idea about that? Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |