Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

Posted by Yuval Itzchakov on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Registering-RawType-with-LegacyTypeInfo-via-Flink-CREATE-TABLE-DDL-tp40289p40340.html

I see, alright. 

Thank you for all the help!

On Mon, Dec 28, 2020 at 3:57 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

FLIP-136 will resolve the remaining legacy type issues for DataStream
API. After that we can drop legacy types because everything can be
expressed with DataType. DataType is a super type of TypeInformation.

Registering types (e.g. giving a RAW type backed by io.circe.Json a name
such that it can be used in a DDL statement) is also in the backlog but
requires another FLIP.

Regards,
Timo


On 28.12.20 13:04, Yuval Itzchakov wrote:
> Hi Timo,
> I will look at the Catalog interface and see what it requires.
>
> For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
> resolve the issue around legacy types? Will it's implementation allow to
> register LEGACY types? or a new variation of them?
>
> On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I would recommend to use the old UDF stack for now. You can simply call
>     `StreamTableEnvironment.registerFunction` instead of
>     `createTemporarySystemFunction`. Then the UDF returns a legacy type
>     that
>     the DataStream API understands.
>
>     Have you thought about implementing your own catalog instead of
>     generating CREATE TABLE statements? The catalog API seems a bit complex
>     at first glance but only requires to implement a couple of methods. In
>     this case you can implement your own `CatalogTable` which is the parsed
>     representation of a `CREATE TABLE` statement. In this case you would
>     have the full control over the entire type stack end-to-end.
>
>     Regards,
>     Timo
>
>     On 28.12.20 10:36, Yuval Itzchakov wrote:
>      > Timo, an additional question.
>      >
>      > I am currently using TypeConversions.fromLegacyInfoToDataType.
>     However,
>      > this still does not allow me to register this table with the catalog
>      > since any representation of LEGACY isn't supported (I don't see it
>      > generating any other DataType other than RAW(.., LEGACY). Is
>     there any
>      > way I can preserve the underlying type and still register the column
>      > somehow with CREATE TABLE?
>      >
>      > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
>     <[hidden email] <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Hi Timo,
>      >
>      >     Thanks for the explanation. Passing around a byte array is not
>      >     possible since I need to know the concrete type later for
>      >     serialization in my sink, so I need to keep the type.
>      >
>      >     What I'm trying to achieve is the following:
>      >     I have a scalar UDF function:
>      >
>      >     image.png
>      >
>      >     This function is later used in processing of a Flink table, by
>      >     calling PARSE_JSON on the selected field.
>      >     Whoever uses these UDFs isn't aware of any Flink syntax for
>     creating
>      >     and registering tables, I generate the CREATE TABLE statement
>     behind
>      >     the scenes, dynamically.
>      >     for each table. In order for this to work, I need the UDF to
>     output
>      >     the correct type (in this example, io.circe.Json). Later,
>     this JSON
>      >     type (or any other type returned by the UDF for that matter) will
>      >     get serialized in a custom sink and into a data warehouse (that's
>      >     why I need to keep the concrete type, since serialization
>     happens at
>      >     the edges before writing it out).
>      >
>      >     The reason I'm converting between Table and DataStream is
>     that this
>      >     table will be further manipulated before being written to the
>     custom
>      >     DynamicTableSink by applying transformations on a
>     DataStream[Row],
>      >     and then converted back to a Table to be used in an additional
>      >     CREATE TABLE and then INSERT INTO statement.
>      >
>      >     This is why I have these conversions back and forth, and why I
>      >     somehow need a way to register the legacy types as a valid
>     type of
>      >     the table.
>      >     Hope that clarifies a bit, since the pipeline is rather complex I
>      >     can't really share a MVCE of it.
>      >
>      >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >         Hi Yuval,
>      >
>      >         the legacy type has no string representation that can be
>     used in
>      >         a SQL
>      >         DDL statement. The current string representation
>     LEGACY(...) is
>      >         only a
>      >         temporary work around to persist the old types in catalogs.
>      >
>      >         Until FLIP-136 is fully implemented,
>     toAppendStream/toRetractStream
>      >         support only legacy type info. So I would recommend to
>     use the
>      >         legacy
>      >         type in the UDF return type as well. Either you use the old
>      >         `getResultType` method or you override `getTypeInference`
>     and call
>      >         `TypeConversions.fromLegacyInfoToDataType`.
>      >
>      >         Another work around could be that you simply use `BYTES`
>     as the
>      >         return
>      >         type and pass around a byte array instead.
>      >
>      >         Maybe you can show us a little end-to-end example, what
>     you are
>      >         trying
>      >         to achieve?
>      >
>      >         Regards,
>      >         Timo
>      >
>      >
>      >         On 28.12.20 07:47, Yuval Itzchakov wrote:
>      >          > Hi Danny,
>      >          >
>      >          > Yes, I tried implementing the DataTypeFactory for the
>     UDF using
>      >          > TypeInformationRawType (which is deprecated BTW, and
>     there's
>      >         no support
>      >          > for RawType in the conversion), didn't help.
>      >          >
>      >          > I did manage to get the conversion working using
>      >          > TableEnvironment.toAppendStream (I was previously directly
>      >         calling
>      >          > TypeConversions) but still remains the problem that Flink
>      >         can't register
>      >          > LEGACY types via the CREATE TABLE DDL
>      >          >
>      >          > On Mon, Dec 28, 2020, 04:25 Danny Chan
>     <[hidden email] <mailto:[hidden email]>
>      >         <mailto:[hidden email] <mailto:[hidden email]>>
>      >          > <mailto:[hidden email]
>     <mailto:[hidden email]> <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>      >         wrote:
>      >          >
>      >          >      > SQL parse failed. Encount
>      >          >     What syntax did you use ?
>      >          >
>      >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
>      >         convert a plain
>      >          >     RAW type back to TypeInformation.
>      >          >
>      >          >     Did you try to construct type information by a new
>      >          >     fresh TypeInformationRawType ?
>      >          >
>      >          >     Yuval Itzchakov <[hidden email]
>     <mailto:[hidden email]>
>      >         <mailto:[hidden email] <mailto:[hidden email]>>
>     <mailto:[hidden email] <mailto:[hidden email]>
>      >         <mailto:[hidden email] <mailto:[hidden email]>>>> 于
>      >          >     2020年12月24日周四 下午7:24写道:
>      >          >
>      >          >         An expansion to my question:
>      >          >
>      >          >         What I really want is for the UDF to return
>      >         `RAW(io.circe.Json,
>      >          >         ?)` type, but I have to do a conversion
>     between Table and
>      >          >         DataStream, and
>      >         TypeConversions.fromDataTypeToLegacyInfo cannot
>      >          >         convert a plain RAW type back to TypeInformation.
>      >          >
>      >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
>      >          >         <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>
>      >         <mailto:[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>>> wrote:
>      >          >
>      >          >             Hi,
>      >          >
>      >          >             I have a UDF which returns a type
>     of MAP<STRING,
>      >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When
>     I try
>      >         to register
>      >          >             this type with Flink via the CREATE TABLE
>     DDL, I
>      >         encounter
>      >          >             an exception:
>      >          >
>      >          >             - SQL parse failed. Encountered "(" at line 2,
>      >         column 256.
>      >          >             Was expecting one of:
>      >          >                  "NOT" ...
>      >          >                  "NULL" ...
>      >          >                  ">" ...
>      >          >                  "MULTISET" ...
>      >          >                  "ARRAY" ...
>      >          >                  "." ...
>      >          >
>      >          >             Which looks like the planner doesn't like the
>      >         round brackets
>      >          >             on the LEGACY type. What is the correct way to
>      >         register the
>      >          >             table with this type with Flink?
>      >          >             --
>      >          >             Best Regards,
>      >          >             Yuval Itzchakov.
>      >          >
>      >          >
>      >          >
>      >          >         --
>      >          >         Best Regards,
>      >          >         Yuval Itzchakov.
>      >          >
>      >
>      >
>      >
>      >     --
>      >     Best Regards,
>      >     Yuval Itzchakov.
>      >
>      >
>      >
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.