Problem with SQL-API and nested objects in case class

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with SQL-API and nested objects in case class

Lothium
Hey,
I need some help regarding the sql and table api. I'm using Apache Flink
1.3.2 with scala 2.11.11 (also tried 2.11.8) and I created a DataStream
(based on a scala case class) and I registered this as a table. The case
class includes some Lists, because the underlying JSON has some Arrays in
the schema. I had a look at the SQL API documentation and there is an
example with the CROSS JOIN and UNNEST function, so I tried the following
example SQL query:
"SELECT t.item.* FROM product, UNNEST(entity.items) AS t (item)".

When I run the stream I get the following error:
"Exception in thread "main" java.lang.AssertionError: Cycle detected during
type-checking
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:93)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
        at
org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
        at
org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122)
        at
org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:71)
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
        at
org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
        at
org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
        at
org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
        at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
        at
org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
        at
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:325)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2765)
        at
org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:324)
        at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4603)
        at
org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:52)
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2941)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2926)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2963)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2935)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3168)
        at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
        at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
        at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
        at
org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
...."

Do you have any idea about this? The stream works when I do a simple select
without doing the unnest etc.. Is there also a way to do the unnest via the
Table API? I haven't found an example or an operator for this.

Thanks in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Problem with SQL-API and nested objects in case class

Timo Walther
Hi,

thanks for writing on the mailling list. I could reproduce your error
and opened an issue for it
(https://issues.apache.org/jira/browse/FLINK-8107). UNNEST currently
only supports unnesting and joining an array of the same relation.
However joining of two relations will be supported soon (window joins in
1.4 and full joins in 1.5). I don't know your exact use case maybe you
can explain a bit more what you want to do? Otherwise it might be worth
looking into user-defined table functions because they can also be quite
useful to unroll arrays.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html#table-functions

I hope that helps.

Regards,
Timo


Am 11/19/17 um 6:59 PM schrieb Lothium:

> Hey,
> I need some help regarding the sql and table api. I'm using Apache Flink
> 1.3.2 with scala 2.11.11 (also tried 2.11.8) and I created a DataStream
> (based on a scala case class) and I registered this as a table. The case
> class includes some Lists, because the underlying JSON has some Arrays in
> the schema. I had a look at the SQL API documentation and there is an
> example with the CROSS JOIN and UNNEST function, so I tried the following
> example SQL query:
> "SELECT t.item.* FROM product, UNNEST(entity.items) AS t (item)".
>
> When I run the stream I get the following error:
> "Exception in thread "main" java.lang.AssertionError: Cycle detected during
> type-checking
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:93)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowTypeSansSystemColumns(AbstractNamespace.java:122)
> at
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:71)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
> at
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
> at
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
> at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
> at
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
> at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:325)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:2765)
> at
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:324)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:4603)
> at
> org.apache.calcite.sql.validate.UnnestNamespace.validateImpl(UnnestNamespace.java:52)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2941)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2926)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:2963)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2935)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3168)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
> at
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
> at
> org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
> ...."
>
> Do you have any idea about this? The stream works when I do a simple select
> without doing the unnest etc.. Is there also a way to do the unnest via the
> Table API? I haven't found an example or an operator for this.
>
> Thanks in advance!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Problem with SQL-API and nested objects in case class

Lothium
Hey Timo,
thanks for your warm welcome and for creating a ticket to fix this!
My scenario is the following:
I receive different JSON entities from an AMQP queue. I have a source to
collect the events, after that I parse them into the different internal case
classes and split the stream via the split function by the entity type.
After that I want to transform the different entities and write the new JSON
format (per entity type) to a file sink.
I know I can do the transformation of the case classes to other case classes
via scala code, but I thought it would be sometimes easier to do the
transformations and also aggregations via the SQL or Table-API, but there
are sometimes JSON arrays with objects in there and this seems to be the
problem currently.
I will have a look at the UDFs, maybe I can write something for my purpose.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Problem with SQL-API and nested objects in case class

Timo Walther
Actually, your use case should be doable with Flink's Table & SQL API
with some additional UDFs. The API can handle JSON objects if they are
valid composite types and you can access arrays as well. The splitting
might be a bit tricky in SQL, you could model it simply as a where()
clause or maybe a groupBy().

If you experience additional limitations or problems, let us know about it.

Good luck.

Regards,
Timo


Am 11/20/17 um 9:16 PM schrieb Lothium:

> Hey Timo,
> thanks for your warm welcome and for creating a ticket to fix this!
> My scenario is the following:
> I receive different JSON entities from an AMQP queue. I have a source to
> collect the events, after that I parse them into the different internal case
> classes and split the stream via the split function by the entity type.
> After that I want to transform the different entities and write the new JSON
> format (per entity type) to a file sink.
> I know I can do the transformation of the case classes to other case classes
> via scala code, but I thought it would be sometimes easier to do the
> transformations and also aggregations via the SQL or Table-API, but there
> are sometimes JSON arrays with objects in there and this seems to be the
> problem currently.
> I will have a look at the UDFs, maybe I can write something for my purpose.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Problem with SQL-API and nested objects in case class

Lothium
Hey Timo,
when I try to access the array in the case class via the SQL syntax, I get
back an error that the syntax is invalid. Here is an example of the case
class structure:

case class Envelope(name: String, entity: Product)
case class Product(name: String, items: List[Item])
case class Item(attr1: String, attr2: String, attr3: Option[String])

The input stream is of type "Envelope" and I try to do the following query:
"SELECT entity.items[1].attr1 FROM product". Querying the name of the
product is not a problem for example. Is this really an invalid syntax? I
had a look at the documentation and I thought it would work like this.

I also tried to create a UDF to unnest the array, the function looks like
this:

class UnnestArray() extends TableFunction[Item] {
  def eval(product: Product): Unit = {
    product.items.foreach(f => collect(f))
  }
}

When I use this with the "LATERAL TABLE" expression, I get the following
error:
org.apache.flink.table.api.ValidationException: SQL validation failed. From
line 1, column 55 to line 1, column 75: No match found for function
signature unnest_array(<COMPOSITE&lt;my case class definition>

Do you have any idea about that?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/