How to specific key serializer

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to specific key serializer

ChangZhuo Chen (陳昌倬)
Hi,

Currently we use sbt-avrohugger [0] to generate key class for keyed
state.  The key class generated by sbt-avrohugger is both case class,
and AVRO specific record. However, in the following scenarons, Flink
uses different serializers:


* In streaming application, Flink uses CaseClassSerializer for key
  class.
* In state processor API application, Flink uses AvroSerializer for key
  class.


Since they use different serializers for key, they are not compatible.
Is there any way to specific key serializer so that both applications
use the same serializer?



[0] https://github.com/julianpeeters/sbt-avrohugger

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to specific key serializer

Tzu-Li (Gordon) Tai
Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide the serializer to be used. Since that recognizes Scala case classes, the CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into play, and therefore it directly goes to Flink's type extraction for Java classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints written by Scala DataStream jobs that well. 

You can try using TypeInfo annotations to specify a TypeInformationFactory for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a given class. In that custom TypeInformation, you should let it return the correct serializer.

Best,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote:
Hi,

Currently we use sbt-avrohugger [0] to generate key class for keyed
state.  The key class generated by sbt-avrohugger is both case class,
and AVRO specific record. However, in the following scenarons, Flink
uses different serializers:


* In streaming application, Flink uses CaseClassSerializer for key
  class.
* In state processor API application, Flink uses AvroSerializer for key
  class.


Since they use different serializers for key, they are not compatible.
Is there any way to specific key serializer so that both applications
use the same serializer?



[0] https://github.com/julianpeeters/sbt-avrohugger

--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
Reply | Threaded
Open this post in threaded view
|

Re: How to specific key serializer

ChangZhuo Chen (陳昌倬)
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote:
> You can try using TypeInfo annotations to specify a TypeInformationFactory
> for your key class [1].
> This allows you to "plug-in" the TypeInformation extracted by Flink for a
> given class. In that custom TypeInformation, you should let it return the
> correct serializer.

Hi Gordon,

Thanks for the tip. We have solve the problem by specific
TypeInformation in readKeyedState.


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

signature.asc (849 bytes) Download Attachment