http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Registering-RawType-with-LegacyTypeInfo-via-Flink-CREATE-TABLE-DDL-tp40289p40339.html
API. After that we can drop legacy types because everything can be
expressed with DataType. DataType is a super type of TypeInformation.
Registering types (e.g. giving a RAW type backed by io.circe.Json a name
> Hi Timo,
> I will look at the Catalog interface and see what it requires.
>
> For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
> resolve the issue around legacy types? Will it's implementation allow to
> register LEGACY types? or a new variation of them?
>
> On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <
[hidden email]
> <mailto:
[hidden email]>> wrote:
>
> I would recommend to use the old UDF stack for now. You can simply call
> `StreamTableEnvironment.registerFunction` instead of
> `createTemporarySystemFunction`. Then the UDF returns a legacy type
> that
> the DataStream API understands.
>
> Have you thought about implementing your own catalog instead of
> generating CREATE TABLE statements? The catalog API seems a bit complex
> at first glance but only requires to implement a couple of methods. In
> this case you can implement your own `CatalogTable` which is the parsed
> representation of a `CREATE TABLE` statement. In this case you would
> have the full control over the entire type stack end-to-end.
>
> Regards,
> Timo
>
> On 28.12.20 10:36, Yuval Itzchakov wrote:
> > Timo, an additional question.
> >
> > I am currently using TypeConversions.fromLegacyInfoToDataType.
> However,
> > this still does not allow me to register this table with the catalog
> > since any representation of LEGACY isn't supported (I don't see it
> > generating any other DataType other than RAW(.., LEGACY). Is
> there any
> > way I can preserve the underlying type and still register the column
> > somehow with CREATE TABLE?
> >
> > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov
> <
[hidden email] <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
> >
> > Hi Timo,
> >
> > Thanks for the explanation. Passing around a byte array is not
> > possible since I need to know the concrete type later for
> > serialization in my sink, so I need to keep the type.
> >
> > What I'm trying to achieve is the following:
> > I have a scalar UDF function:
> >
> > image.png
> >
> > This function is later used in processing of a Flink table, by
> > calling PARSE_JSON on the selected field.
> > Whoever uses these UDFs isn't aware of any Flink syntax for
> creating
> > and registering tables, I generate the CREATE TABLE statement
> behind
> > the scenes, dynamically.
> > for each table. In order for this to work, I need the UDF to
> output
> > the correct type (in this example, io.circe.Json). Later,
> this JSON
> > type (or any other type returned by the UDF for that matter) will
> > get serialized in a custom sink and into a data warehouse (that's
> > why I need to keep the concrete type, since serialization
> happens at
> > the edges before writing it out).
> >
> > The reason I'm converting between Table and DataStream is
> that this
> > table will be further manipulated before being written to the
> custom
> > DynamicTableSink by applying transformations on a
> DataStream[Row],
> > and then converted back to a Table to be used in an additional
> > CREATE TABLE and then INSERT INTO statement.
> >
> > This is why I have these conversions back and forth, and why I
> > somehow need a way to register the legacy types as a valid
> type of
> > the table.
> > Hope that clarifies a bit, since the pipeline is rather complex I
> > can't really share a MVCE of it.
> >
> > On Mon, Dec 28, 2020 at 11:08 AM Timo Walther
> <
[hidden email] <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>> wrote:
> >
> > Hi Yuval,
> >
> > the legacy type has no string representation that can be
> used in
> > a SQL
> > DDL statement. The current string representation
> LEGACY(...) is
> > only a
> > temporary work around to persist the old types in catalogs.
> >
> > Until FLIP-136 is fully implemented,
> toAppendStream/toRetractStream
> > support only legacy type info. So I would recommend to
> use the
> > legacy
> > type in the UDF return type as well. Either you use the old
> > `getResultType` method or you override `getTypeInference`
> and call
> > `TypeConversions.fromLegacyInfoToDataType`.
> >
> > Another work around could be that you simply use `BYTES`
> as the
> > return
> > type and pass around a byte array instead.
> >
> > Maybe you can show us a little end-to-end example, what
> you are
> > trying
> > to achieve?
> >
> > Regards,
> > Timo
> >
> >
> > On 28.12.20 07:47, Yuval Itzchakov wrote:
> > > Hi Danny,
> > >
> > > Yes, I tried implementing the DataTypeFactory for the
> UDF using
> > > TypeInformationRawType (which is deprecated BTW, and
> there's
> > no support
> > > for RawType in the conversion), didn't help.
> > >
> > > I did manage to get the conversion working using
> > > TableEnvironment.toAppendStream (I was previously directly
> > calling
> > > TypeConversions) but still remains the problem that Flink
> > can't register
> > > LEGACY types via the CREATE TABLE DDL
> > >
> > > On Mon, Dec 28, 2020, 04:25 Danny Chan
> <
[hidden email] <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>
> > > <mailto:
[hidden email]
> <mailto:
[hidden email]> <mailto:
[hidden email]
> <mailto:
[hidden email]>>>>
> > wrote:
> > >
> > > > SQL parse failed. Encount
> > > What syntax did you use ?
> > >
> > > > TypeConversions.fromDataTypeToLegacyInfo cannot
> > convert a plain
> > > RAW type back to TypeInformation.
> > >
> > > Did you try to construct type information by a new
> > > fresh TypeInformationRawType ?
> > >
> > > Yuval Itzchakov <
[hidden email]
> <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>
> <mailto:
[hidden email] <mailto:
[hidden email]>
> > <mailto:
[hidden email] <mailto:
[hidden email]>>>> 于
> > > 2020年12月24日周四 下午7:24写道:
> > >
> > > An expansion to my question:
> > >
> > > What I really want is for the UDF to return
> > `RAW(io.circe.Json,
> > > ?)` type, but I have to do a conversion
> between Table and
> > > DataStream, and
> > TypeConversions.fromDataTypeToLegacyInfo cannot
> > > convert a plain RAW type back to TypeInformation.
> > >
> > > On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> > > <
[hidden email] <mailto:
[hidden email]>
> <mailto:
[hidden email] <mailto:
[hidden email]>>
> > <mailto:
[hidden email] <mailto:
[hidden email]>
> <mailto:
[hidden email] <mailto:
[hidden email]>>>> wrote:
> > >
> > > Hi,
> > >
> > > I have a UDF which returns a type
> of MAP<STRING,
> > > LEGACY('RAW', 'ANY<io.circe.Json>')>. When
> I try
> > to register
> > > this type with Flink via the CREATE TABLE
> DDL, I
> > encounter
> > > an exception:
> > >
> > > - SQL parse failed. Encountered "(" at line 2,
> > column 256.
> > > Was expecting one of:
> > > "NOT" ...
> > > "NULL" ...
> > > ">" ...
> > > "MULTISET" ...
> > > "ARRAY" ...
> > > "." ...
> > >
> > > Which looks like the planner doesn't like the
> > round brackets
> > > on the LEGACY type. What is the correct way to
> > register the
> > > table with this type with Flink?
> > > --
> > > Best Regards,
> > > Yuval Itzchakov.
> > >
> > >
> > >
> > > --
> > > Best Regards,
> > > Yuval Itzchakov.
> > >
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.