Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Timo Walther on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Registering-RawType-with-LegacyTypeInfo-via-Flink-CREATE-TABLE-DDL-tp40289p40330.html

I would recommend to use the old UDF stack for now. You can simply call
`StreamTableEnvironment.registerFunction` instead of
`createTemporarySystemFunction`. Then the UDF returns a legacy type that
the DataStream API understands.

Have you thought about implementing your own catalog instead of
generating CREATE TABLE statements? The catalog API seems a bit complex
at first glance but only requires to implement a couple of methods. In
this case you can implement your own `CatalogTable` which is the parsed
representation of a `CREATE TABLE` statement. In this case you would
have the full control over the entire type stack end-to-end.

Regards,
Timo

On 28.12.20 10:36, Yuval Itzchakov wrote:

> Timo, an additional question.
>
> I am currently using TypeConversions.fromLegacyInfoToDataType. However,
> this still does not allow me to register this table with the catalog
> since any representation of LEGACY isn't supported (I don't see it
> generating any other DataType other than RAW(.., LEGACY). Is there any
> way I can preserve the underlying type and still register the column
> somehow with CREATE TABLE?
>
> On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Timo,
>
>     Thanks for the explanation. Passing around a byte array is not
>     possible since I need to know the concrete type later for
>     serialization in my sink, so I need to keep the type.
>
>     What I'm trying to achieve is the following:
>     I have a scalar UDF function:
>
>     image.png
>
>     This function is later used in processing of a Flink table, by
>     calling PARSE_JSON on the selected field.
>     Whoever uses these UDFs isn't aware of any Flink syntax for creating
>     and registering tables, I generate the CREATE TABLE statement behind
>     the scenes, dynamically.
>     for each table. In order for this to work, I need the UDF to output
>     the correct type (in this example, io.circe.Json). Later, this JSON
>     type (or any other type returned by the UDF for that matter) will
>     get serialized in a custom sink and into a data warehouse (that's
>     why I need to keep the concrete type, since serialization happens at
>     the edges before writing it out).
>
>     The reason I'm converting between Table and DataStream is that this
>     table will be further manipulated before being written to the custom
>     DynamicTableSink by applying transformations on a DataStream[Row],
>     and then converted back to a Table to be used in an additional
>     CREATE TABLE and then INSERT INTO statement.
>
>     This is why I have these conversions back and forth, and why I
>     somehow need a way to register the legacy types as a valid type of
>     the table.
>     Hope that clarifies a bit, since the pipeline is rather complex I
>     can't really share a MVCE of it.
>
>     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi Yuval,
>
>         the legacy type has no string representation that can be used in
>         a SQL
>         DDL statement. The current string representation LEGACY(...) is
>         only a
>         temporary work around to persist the old types in catalogs.
>
>         Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
>         support only legacy type info. So I would recommend to use the
>         legacy
>         type in the UDF return type as well. Either you use the old
>         `getResultType` method or you override `getTypeInference` and call
>         `TypeConversions.fromLegacyInfoToDataType`.
>
>         Another work around could be that you simply use `BYTES` as the
>         return
>         type and pass around a byte array instead.
>
>         Maybe you can show us a little end-to-end example, what you are
>         trying
>         to achieve?
>
>         Regards,
>         Timo
>
>
>         On 28.12.20 07:47, Yuval Itzchakov wrote:
>          > Hi Danny,
>          >
>          > Yes, I tried implementing the DataTypeFactory for the UDF using
>          > TypeInformationRawType (which is deprecated BTW, and there's
>         no support
>          > for RawType in the conversion), didn't help.
>          >
>          > I did manage to get the conversion working using
>          > TableEnvironment.toAppendStream (I was previously directly
>         calling
>          > TypeConversions) but still remains the problem that Flink
>         can't register
>          > LEGACY types via the CREATE TABLE DDL
>          >
>          > On Mon, Dec 28, 2020, 04:25 Danny Chan <[hidden email]
>         <mailto:[hidden email]>
>          > <mailto:[hidden email] <mailto:[hidden email]>>>
>         wrote:
>          >
>          >      > SQL parse failed. Encount
>          >     What syntax did you use ?
>          >
>          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
>         convert a plain
>          >     RAW type back to TypeInformation.
>          >
>          >     Did you try to construct type information by a new
>          >     fresh TypeInformationRawType ?
>          >
>          >     Yuval Itzchakov <[hidden email]
>         <mailto:[hidden email]> <mailto:[hidden email]
>         <mailto:[hidden email]>>> 于
>          >     2020年12月24日周四 下午7:24写道:
>          >
>          >         An expansion to my question:
>          >
>          >         What I really want is for the UDF to return
>         `RAW(io.circe.Json,
>          >         ?)` type, but I have to do a conversion between Table and
>          >         DataStream, and
>         TypeConversions.fromDataTypeToLegacyInfo cannot
>          >         convert a plain RAW type back to TypeInformation.
>          >
>          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>          >         <[hidden email] <mailto:[hidden email]>
>         <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>          >
>          >             Hi,
>          >
>          >             I have a UDF which returns a type of MAP<STRING,
>          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
>         to register
>          >             this type with Flink via the CREATE TABLE DDL, I
>         encounter
>          >             an exception:
>          >
>          >             - SQL parse failed. Encountered "(" at line 2,
>         column 256.
>          >             Was expecting one of:
>          >                  "NOT" ...
>          >                  "NULL" ...
>          >                  ">" ...
>          >                  "MULTISET" ...
>          >                  "ARRAY" ...
>          >                  "." ...
>          >
>          >             Which looks like the planner doesn't like the
>         round brackets
>          >             on the LEGACY type. What is the correct way to
>         register the
>          >             table with this type with Flink?
>          >             --
>          >             Best Regards,
>          >             Yuval Itzchakov.
>          >
>          >
>          >
>          >         --
>          >         Best Regards,
>          >         Yuval Itzchakov.
>          >
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.