Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

Yuval Itzchakov
When upgrading to Flink 1.13, I ran into deprecation warnings on TypeConversions

image.png

The deprecation message states that this API will be deprecated soon, but does not mention the alternatives that can be used for these transformations.

My use case is that I have a table that needs to be converted into a DataStream[Row] and in turn I need to apply some stateful transformations on it. In order to do that I need the TypeInformation[Row] produced in order to pass into the various state functions.

[hidden email] I would love your help on this.
--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

Timo Walther
Hi Yuval,


TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
between TypeInformation and DataType until TypeInformation is not
exposed through the Table API anymore.

Beginning from Flink 1.13 the Table API is able to serialize the records
to the first DataStream operator via toDataStream or toChangelogStream.
Internally, it uses
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
binary representation is using internal data structures and conversion
will be performed during serialization/deserialization:

conversion -> internal -> conversion

You have two possibilities:

1) You simply call `tableEnv.toDataStream(table).getType()` and pass
this type on to the next operator.

2) You define your own TypeInformation as you would usually do it in
DataStream API without Table API.

We might serialize `Row`s with `RowSerializer` again in the near future.
But for now we went with the most generic solution that supports
everything that can come out of Table API.

Regards,
Timo

On 04.06.21 15:12, Yuval Itzchakov wrote:

> When upgrading to Flink 1.13, I ran into deprecation warnings on
> TypeConversions
>
> image.png
>
> The deprecation message states that this API will be deprecated soon,
> but does not mention the alternatives that can be used for these
> transformations.
>
> My use case is that I have a table that needs to be converted into a
> DataStream[Row] and in turn I need to apply some stateful
> transformations on it. In order to do that I need the
> TypeInformation[Row] produced in order to pass into the various state
> functions.
>
> @Timo Walther <mailto:[hidden email]> I would love your help on this.
> --
> Best Regards,
> Yuval Itzchakov.

Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

Yuval Itzchakov
Hi Timo,
Thank you for the response.

The tables being created in reality are based on arbitrary SQL code such that I don't know what the schema actually is to create the TypeInformation "by hand" and pass it on to the DataStream API.

This leaves me with option 1, which leads to another question: If I have some state records stored in RocksDB from a current running job in a previous Flink version (1.9.3), will changing the TypeInformation from TypeInformation[Row] to the ExternalTypeInformation now break the compatibility of the state currently stored and cause them to be losed essentially? My guy feeling says yes unless some form of backwards compatibility is going to be written specifically for the usecase.


On Fri, Jun 4, 2021, 16:33 Timo Walther <[hidden email]> wrote:
Hi Yuval,


TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
between TypeInformation and DataType until TypeInformation is not
exposed through the Table API anymore.

Beginning from Flink 1.13 the Table API is able to serialize the records
to the first DataStream operator via toDataStream or toChangelogStream.
Internally, it uses
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
binary representation is using internal data structures and conversion
will be performed during serialization/deserialization:

conversion -> internal -> conversion

You have two possibilities:

1) You simply call `tableEnv.toDataStream(table).getType()` and pass
this type on to the next operator.

2) You define your own TypeInformation as you would usually do it in
DataStream API without Table API.

We might serialize `Row`s with `RowSerializer` again in the near future.
But for now we went with the most generic solution that supports
everything that can come out of Table API.

Regards,
Timo

On 04.06.21 15:12, Yuval Itzchakov wrote:
> When upgrading to Flink 1.13, I ran into deprecation warnings on
> TypeConversions
>
> image.png
>
> The deprecation message states that this API will be deprecated soon,
> but does not mention the alternatives that can be used for these
> transformations.
>
> My use case is that I have a table that needs to be converted into a
> DataStream[Row] and in turn I need to apply some stateful
> transformations on it. In order to do that I need the
> TypeInformation[Row] produced in order to pass into the various state
> functions.
>
> @Timo Walther <mailto:[hidden email]> I would love your help on this.
> --
> Best Regards,
> Yuval Itzchakov.

Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

Timo Walther
Hi Yuval,

I would recommend option 2. Because esp. when it comes to state you
should be in control what is persisted. There is no guarantee that the
ExternalSerializer will not change in the future. It is only meant for
shipping data as the input of the next operator.

I would recommend to write some little tool that traverses
`table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits
the types that you allow in state (e.g. no structured types) and
translates it to TypeInformation.

Regards,
Timo


On 04.06.21 16:05, Yuval Itzchakov wrote:

> Hi Timo,
> Thank you for the response.
>
> The tables being created in reality are based on arbitrary SQL code such
> that I don't know what the schema actually is to create the
> TypeInformation "by hand" and pass it on to the DataStream API.
>
> This leaves me with option 1, which leads to another question: If I have
> some state records stored in RocksDB from a current running job in a
> previous Flink version (1.9.3), will changing the TypeInformation from
> TypeInformation[Row] to the ExternalTypeInformation now break the
> compatibility of the state currently stored and cause them to be losed
> essentially? My guy feeling says yes unless some form of backwards
> compatibility is going to be written specifically for the usecase.
>
>
> On Fri, Jun 4, 2021, 16:33 Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yuval,
>
>
>     TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
>     between TypeInformation and DataType until TypeInformation is not
>     exposed through the Table API anymore.
>
>     Beginning from Flink 1.13 the Table API is able to serialize the
>     records
>     to the first DataStream operator via toDataStream or toChangelogStream.
>     Internally, it uses
>     org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
>     binary representation is using internal data structures and conversion
>     will be performed during serialization/deserialization:
>
>     conversion -> internal -> conversion
>
>     You have two possibilities:
>
>     1) You simply call `tableEnv.toDataStream(table).getType()` and pass
>     this type on to the next operator.
>
>     2) You define your own TypeInformation as you would usually do it in
>     DataStream API without Table API.
>
>     We might serialize `Row`s with `RowSerializer` again in the near
>     future.
>     But for now we went with the most generic solution that supports
>     everything that can come out of Table API.
>
>     Regards,
>     Timo
>
>     On 04.06.21 15:12, Yuval Itzchakov wrote:
>      > When upgrading to Flink 1.13, I ran into deprecation warnings on
>      > TypeConversions
>      >
>      > image.png
>      >
>      > The deprecation message states that this API will be deprecated
>     soon,
>      > but does not mention the alternatives that can be used for these
>      > transformations.
>      >
>      > My use case is that I have a table that needs to be converted into a
>      > DataStream[Row] and in turn I need to apply some stateful
>      > transformations on it. In order to do that I need the
>      > TypeInformation[Row] produced in order to pass into the various
>     state
>      > functions.
>      >
>      > @Timo Walther <mailto:[hidden email]
>     <mailto:[hidden email]>> I would love your help on this.
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
>

Reply | Threaded
Open this post in threaded view
|

Re: Alternatives to TypeConversions fromDataTypeToLegacyInfo / fromLegacyInfoToDataType

Yuval Itzchakov
Thanks Timo 🙏

On Fri, Jun 4, 2021, 17:13 Timo Walther <[hidden email]> wrote:
Hi Yuval,

I would recommend option 2. Because esp. when it comes to state you
should be in control what is persisted. There is no guarantee that the
ExternalSerializer will not change in the future. It is only meant for
shipping data as the input of the next operator.

I would recommend to write some little tool that traverses
`table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits
the types that you allow in state (e.g. no structured types) and
translates it to TypeInformation.

Regards,
Timo


On 04.06.21 16:05, Yuval Itzchakov wrote:
> Hi Timo,
> Thank you for the response.
>
> The tables being created in reality are based on arbitrary SQL code such
> that I don't know what the schema actually is to create the
> TypeInformation "by hand" and pass it on to the DataStream API.
>
> This leaves me with option 1, which leads to another question: If I have
> some state records stored in RocksDB from a current running job in a
> previous Flink version (1.9.3), will changing the TypeInformation from
> TypeInformation[Row] to the ExternalTypeInformation now break the
> compatibility of the state currently stored and cause them to be losed
> essentially? My guy feeling says yes unless some form of backwards
> compatibility is going to be written specifically for the usecase.
>
>
> On Fri, Jun 4, 2021, 16:33 Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yuval,
>
>
>     TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
>     between TypeInformation and DataType until TypeInformation is not
>     exposed through the Table API anymore.
>
>     Beginning from Flink 1.13 the Table API is able to serialize the
>     records
>     to the first DataStream operator via toDataStream or toChangelogStream.
>     Internally, it uses
>     org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. The
>     binary representation is using internal data structures and conversion
>     will be performed during serialization/deserialization:
>
>     conversion -> internal -> conversion
>
>     You have two possibilities:
>
>     1) You simply call `tableEnv.toDataStream(table).getType()` and pass
>     this type on to the next operator.
>
>     2) You define your own TypeInformation as you would usually do it in
>     DataStream API without Table API.
>
>     We might serialize `Row`s with `RowSerializer` again in the near
>     future.
>     But for now we went with the most generic solution that supports
>     everything that can come out of Table API.
>
>     Regards,
>     Timo
>
>     On 04.06.21 15:12, Yuval Itzchakov wrote:
>      > When upgrading to Flink 1.13, I ran into deprecation warnings on
>      > TypeConversions
>      >
>      > image.png
>      >
>      > The deprecation message states that this API will be deprecated
>     soon,
>      > but does not mention the alternatives that can be used for these
>      > transformations.
>      >
>      > My use case is that I have a table that needs to be converted into a
>      > DataStream[Row] and in turn I need to apply some stateful
>      > transformations on it. In order to do that I need the
>      > TypeInformation[Row] produced in order to pass into the various
>     state
>      > functions.
>      >
>      > @Timo Walther <mailto:[hidden email]
>     <mailto:[hidden email]>> I would love your help on this.
>      > --
>      > Best Regards,
>      > Yuval Itzchakov.
>