http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Registering-RawType-with-LegacyTypeInfo-via-Flink-CREATE-TABLE-DDL-tp40289p40330.html
I would recommend to use the old UDF stack for now. You can simply call
`createTemporarySystemFunction`. Then the UDF returns a legacy type that
at first glance but only requires to implement a couple of methods. In
representation of a `CREATE TABLE` statement. In this case you would
> Timo, an additional question.
>
> I am currently using TypeConversions.fromLegacyInfoToDataType. However,
> this still does not allow me to register this table with the catalog
> since any representation of LEGACY isn't supported (I don't see it
> generating any other DataType other than RAW(.., LEGACY). Is there any
> way I can preserve the underlying type and still register the column
> somehow with CREATE TABLE?
>
> On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Timo,
>
> Thanks for the explanation. Passing around a byte array is not
> possible since I need to know the concrete type later for
> serialization in my sink, so I need to keep the type.
>
> What I'm trying to achieve is the following:
> I have a scalar UDF function:
>
> image.png
>
> This function is later used in processing of a Flink table, by
> calling PARSE_JSON on the selected field.
> Whoever uses these UDFs isn't aware of any Flink syntax for creating
> and registering tables, I generate the CREATE TABLE statement behind
> the scenes, dynamically.
> for each table. In order for this to work, I need the UDF to output
> the correct type (in this example, io.circe.Json). Later, this JSON
> type (or any other type returned by the UDF for that matter) will
> get serialized in a custom sink and into a data warehouse (that's
> why I need to keep the concrete type, since serialization happens at
> the edges before writing it out).
>
> The reason I'm converting between Table and DataStream is that this
> table will be further manipulated before being written to the custom
> DynamicTableSink by applying transformations on a DataStream[Row],
> and then converted back to a Table to be used in an additional
> CREATE TABLE and then INSERT INTO statement.
>
> This is why I have these conversions back and forth, and why I
> somehow need a way to register the legacy types as a valid type of
> the table.
> Hope that clarifies a bit, since the pipeline is rather complex I
> can't really share a MVCE of it.
>
> On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> Hi Yuval,
>
> the legacy type has no string representation that can be used in
> a SQL
> DDL statement. The current string representation LEGACY(...) is
> only a
> temporary work around to persist the old types in catalogs.
>
> Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
> support only legacy type info. So I would recommend to use the
> legacy
> type in the UDF return type as well. Either you use the old
> `getResultType` method or you override `getTypeInference` and call
> `TypeConversions.fromLegacyInfoToDataType`.
>
> Another work around could be that you simply use `BYTES` as the
> return
> type and pass around a byte array instead.
>
> Maybe you can show us a little end-to-end example, what you are
> trying
> to achieve?
>
> Regards,
> Timo
>
>
> On 28.12.20 07:47, Yuval Itzchakov wrote:
> > Hi Danny,
> >
> > Yes, I tried implementing the DataTypeFactory for the UDF using
> > TypeInformationRawType (which is deprecated BTW, and there's
> no support
> > for RawType in the conversion), didn't help.
> >
> > I did manage to get the conversion working using
> > TableEnvironment.toAppendStream (I was previously directly
> calling
> > TypeConversions) but still remains the problem that Flink
> can't register
> > LEGACY types via the CREATE TABLE DDL
> >
> > On Mon, Dec 28, 2020, 04:25 Danny Chan <
[hidden email]
> <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>>
> wrote:
> >
> > > SQL parse failed. Encount
> > What syntax did you use ?
> >
> > > TypeConversions.fromDataTypeToLegacyInfo cannot
> convert a plain
> > RAW type back to TypeInformation.
> >
> > Did you try to construct type information by a new
> > fresh TypeInformationRawType ?
> >
> > Yuval Itzchakov <
[hidden email]
> <mailto:
[hidden email]> <mailto:
[hidden email]
> <mailto:
[hidden email]>>> 于
> > 2020年12月24日周四 下午7:24写道:
> >
> > An expansion to my question:
> >
> > What I really want is for the UDF to return
> `RAW(io.circe.Json,
> > ?)` type, but I have to do a conversion between Table and
> > DataStream, and
> TypeConversions.fromDataTypeToLegacyInfo cannot
> > convert a plain RAW type back to TypeInformation.
> >
> > On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> > <
[hidden email] <mailto:
[hidden email]>
> <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
> >
> > Hi,
> >
> > I have a UDF which returns a type of MAP<STRING,
> > LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
> to register
> > this type with Flink via the CREATE TABLE DDL, I
> encounter
> > an exception:
> >
> > - SQL parse failed. Encountered "(" at line 2,
> column 256.
> > Was expecting one of:
> > "NOT" ...
> > "NULL" ...
> > ">" ...
> > "MULTISET" ...
> > "ARRAY" ...
> > "." ...
> >
> > Which looks like the planner doesn't like the
> round brackets
> > on the LEGACY type. What is the correct way to
> register the
> > table with this type with Flink?
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.