Hi,
Currently we use sbt-avrohugger [0] to generate key class for keyed state. The key class generated by sbt-avrohugger is both case class, and AVRO specific record. However, in the following scenarons, Flink uses different serializers: * In streaming application, Flink uses CaseClassSerializer for key class. * In state processor API application, Flink uses AvroSerializer for key class. Since they use different serializers for key, they are not compatible. Is there any way to specific key serializer so that both applications use the same serializer? [0] https://github.com/julianpeeters/sbt-avrohugger -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
Hi CZ, The issue here is that the Scala DataStream API uses Scala macros to decide the serializer to be used. Since that recognizes Scala case classes, the CaseClassSerializer will be used. However, in the State Processor API, those Scala macros do not come into play, and therefore it directly goes to Flink's type extraction for Java classes, which recognizes this as a Avro generated class. In general, currently the State Processor API doesn't support savepoints written by Scala DataStream jobs that well. You can try using TypeInfo annotations to specify a TypeInformationFactory for your key class [1]. This allows you to "plug-in" the TypeInformation extracted by Flink for a given class. In that custom TypeInformation, you should let it return the correct serializer. Best, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) <[hidden email]> wrote: Hi, |
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote:
> You can try using TypeInfo annotations to specify a TypeInformationFactory > for your key class [1]. > This allows you to "plug-in" the TypeInformation extracted by Flink for a > given class. In that custom TypeInformation, you should let it return the > correct serializer. Hi Gordon, Thanks for the tip. We have solve the problem by specific TypeInformation in readKeyedState. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |