Hi, i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I can see the FlinkKafkaConsumer already deserialized the upstream kafka message. However, I got below error when this message is serialized during pushToOperator. Per the stack trace, the reason is that AvroSerializer is created by AvroFactory.fromSpecific() which creates its private copy of specificData. This private specificData does not have logical type information. This blocks the deserialized messages from being passed to downstream operators. Any idea how to make this work? Appreciated very much! org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) |
Hi, Could it be related to https://issues.apache.org/jira/browse/FLINK-18223 ? Also maybe as a workaround, is it working if you enable object reuse (`StreamExecutionEnvironment#getConfig()#enableObjectReuse())`)? Best regards Piotrek śr., 16 wrz 2020 o 08:09 Lian Jiang <[hidden email]> napisał(a):
|
In reply to this post by Lian Jiang
Hi,
Could you share exactly how do you configure avro & kafka? Do you use Table API or DataStream API? Do you use the ConfluentRegistryDeserializationSchema that comes with Flink or did you built custom DeserializationSchema? Could you maybe share the code for instantiating the source with us? It could help us track down the problematic spot. Best, Dawid On 16/09/2020 08:09, Lian Jiang wrote: > Hi, > > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In > Intellij, I can see the FlinkKafkaConsumer already deserialized the > upstream kafka message. However, I got below error when this message > is serialized during pushToOperator. Per the stack trace, the reason > is that AvroSerializer is created by AvroFactory.fromSpecific() which > creates its private copy of specificData. This private specificData > does not have logical type information. This blocks the deserialized > messages from being passed to downstream operators. Any idea how to > make this work? Appreciated very much! > > > org.apache.avro.AvroRuntimeException: Unknown datum type > java.time.Instant: 2020-09-15T07:00:00Z > at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) > at > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) signature.asc (849 bytes) Download Attachment |
Piotr/Dawid, Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue. I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema: /* the deser class */ public class SpecificRecordSerDe<T extends SpecificRecord> implements KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable { @Override /* how we use above deser class SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>( Thanks Lian On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote: Hi, -- |
Thanks for the update. First of all, why did you decide to build your own
DeserializationSchema instead of using
ConfluentRegistryDeserializationSchema? Your implementation is
quite inefficient you do deserialize > serialize >
deserialize. Serialization/deserialization is usually one of the
heaviest operations in the pipeline. What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you. Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion(). Best, Dawid
On 17/09/2020 16:34, Lian Jiang wrote:
signature.asc (849 bytes) Download Attachment |
Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean) but got "java.lang.Long cannot be cast to java.time.Instant". This may be caused by https://issues.apache.org/jira/browse/FLINK-11030. Is there any progress for this JIRA? Thanks. Regards! Stacktrace: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136) at org.apache.avro.generic.GenericData.setField(GenericData.java:795) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74) at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89) at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16) at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]> wrote:
-- |
Hi Lian, we had a similar discussion on [1]. TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until Hive bumps it [3]. In the thread, I gave some options to avoid running into the issue. The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4] if your logical type is nullable (which is not necessary in most cases). Still, I think it's time for us to revise the decision to wait for Hive to bump and rather upgrade independently. Avro was for a long time stuck on 1.8 but the project gained traction again in the past two years. On the other hand, Hive seems to be rather slow to respond to that and we shouldn't have a slow moving component block us to support a fast moving component if it's such apparent that users want it. [hidden email] could you please pick that topic up and ping the respective maintainers? On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hey Arvid, Just a quick comment to Arvid's mail for now. It should be safe to update the Avro version even if we've been declaring dependency on Avro 1.8.2 by default. Moreover up until now we do not bundle any version of Avro in any of the uber jars we ship. It is true we used Avro version 1.8.2 by default because that's the version that hadoop ships with (the hadoop distributions really bundle avro dependency as part of their binaries). As for the other issue, because Hadoop is no longer the most frequent environment Flink is run and as you said they are not the fastest with upgrading dependencies we decided to upgrade the default Avro version that Flink declares. From Flink 1.12 by default we depend on Avro 1.10 It has already been merged into master[1]. Still users should be able to downgrade the avro version if they need. (If they have specific records generated with older versions or they use hadoop.) @Lian Will look further into the issue. My suspicion though is
there is a problem with the Conversions your generated class
declares. In order for Flink to handle logical types correctly the
generated Avro class must return valid Conversions via
SpecificRecord#getConversions(). Could you share the avro schema
and the generated class? Without the full picture it will be hard
to track down the problem. Again best would be an example that I
could run. Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-18192 On 21/09/2020 08:04, Arvid Heise wrote:
signature.asc (849 bytes) Download Attachment |
In reply to this post by Arvid Heise-3
Hi All,
Avro was finally bumped in https://issues.apache.org/jira/browse/FLINK-18192. The implementers didn't see https://issues.apache.org/jira/browse/FLINK-12532, but it is also updated now. Best, Aljoscha On 21.09.20 08:04, Arvid Heise wrote: > Hi Lian, > > we had a similar discussion on [1]. > > TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until > Hive bumps it [3]. In the thread, I gave some options to avoid running into > the issue. > The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4] > if your logical type is nullable (which is not necessary in most cases). > > Still, I think it's time for us to revise the decision to wait for Hive to > bump and rather upgrade independently. Avro was for a long time stuck on > 1.8 but the project gained traction again in the past two years. On the > other hand, Hive seems to be rather slow to respond to that and we > shouldn't have a slow moving component block us to support a fast moving > component if it's such apparent that users want it. > @Aljoscha Krettek <[hidden email]> could you please pick that topic up > and ping the respective maintainers? > > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html > [2] https://issues.apache.org/jira/browse/FLINK-12532 > [3] https://issues.apache.org/jira/browse/HIVE-21737 > [4] https://issues.apache.org/jira/browse/AVRO-1891 > > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote: > >> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am >> trying ConfluentRegistryAvroDeserializationSchema (if this is what you >> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This >> may be caused by https://issues.apache.org/jira/browse/FLINK-11030. >> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress >> for this JIRA? Thanks. Regards! >> >> >> Stacktrace: >> java.lang.ClassCastException: java.lang.Long cannot be cast to >> java.time.Instant >> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136) >> at org.apache.avro.generic.GenericData.setField(GenericData.java:795) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) >> at >> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) >> at >> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136) >> at >> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237) >> at >> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123) >> at >> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) >> at >> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) >> at >> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74) >> at >> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89) >> at >> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16) >> at >> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) >> >> >> >> Code: >> >> import org.apache.avro.specific.SpecificRecord; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; >> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; >> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware; >> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; >> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; >> import org.apache.kafka.clients.consumer.ConsumerRecord; >> import org.apache.kafka.clients.producer.ProducerRecord; >> >> import javax.annotation.Nullable; >> import java.io.Serializable; >> >> public class SpecificRecordSerDe<T extends SpecificRecord> implements >> KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable { >> >> private final Class<T> tClass; >> private String topic; // for serializer >> private String subject; // for serializer >> private final String schemaRegistryUrl; >> private ConfluentRegistryAvroSerializationSchema<T> serializer; >> private ConfluentRegistryAvroDeserializationSchema<T> deserializer; >> >> private static final Object lock = new Object(); >> >> public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) { >> return new SpecificRecordSerDe(tClass, schemaRegistryUrl); >> } >> >> public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass, >> String schemaRegistryUrl, >> final String topic, >> final String subject) { >> return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject); >> } >> >> private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) { >> this.tClass = tClass; >> this.schemaRegistryUrl = schemaRegistryUrl; >> } >> >> private SpecificRecordSerDe(final Class<T> tClass, >> final String schemaRegistryUrl, >> final String topic, >> final String subject) { >> this(tClass, schemaRegistryUrl); >> this.topic = topic; >> this.subject = subject; >> } >> >> @Override >> public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) { >> if (this.serializer == null) { >> synchronized (lock) { >> if (this.serializer == null) { >> this.serializer = ConfluentRegistryAvroSerializationSchema >> .forSpecific(tClass, this.subject, this.schemaRegistryUrl); >> } >> } >> } >> >> byte[] bytes = this.serializer.serialize(element); >> return new ProducerRecord<>(this.topic, bytes); >> } >> >> public boolean isEndOfStream(T nextElement) { >> return false; >> } >> >> @Override >> public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { >> if (deserializer == null) { >> synchronized (lock) { >> if (deserializer == null) { >> deserializer = ConfluentRegistryAvroDeserializationSchema >> .forSpecific(tClass, this.schemaRegistryUrl); >> } >> } >> } >> >> return deserializer.deserialize(record.value()); >> } >> >> @Override >> public String getTargetTopic(T element) { >> return this.topic; >> } >> >> @Override >> public TypeInformation<T> getProducedType() { >> return TypeInformation.of(tClass); >> } >> } >> >> >> >> >> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]> >> wrote: >> >>> Thanks for the update. >>> >>> First of all, why did you decide to build your own DeserializationSchema >>> instead of using ConfluentRegistryDeserializationSchema? Your >>> implementation is quite inefficient you do deserialize > serialize > >>> deserialize. Serialization/deserialization is usually one of the heaviest >>> operations in the pipeline. >>> >>> What do you return in your getProducedType? From the stack trace I guess >>> you are instantiating the AvroTypeInfo? Could you maybe share a full >>> runnable example? It would make it much easier to help you. >>> >>> Moreover the pattern with registering custom conversions in a >>> SpecificData will not work with AvroSerializer. Custom serializers should >>> be defined in the generated SpecificRecord (in your case PayloadRecord) in >>> the SpecificRecordBase#getConversion(). >>> >>> Best, >>> >>> Dawid >>> >>> >>> On 17/09/2020 16:34, Lian Jiang wrote: >>> >>> Piotr/Dawid, >>> >>> Thanks for the reply. FLINK-18223 seems not to related to this issue and >>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My >>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved >>> the issue. >>> >>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am >>> creating custom DeserializationSchema: >>> >>> >>> /* >>> the deser class >>> */ >>> public class SpecificRecordSerDe<T extends SpecificRecord> implements >>> >>> KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable { >>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder; >>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) { >>> this.tClass = tClass; >>> this.tSchemaStr = tSchemaStr; >>> this.topic = null; >>> this.schemaRegistryUrl = schemaRegistryUrl; >>> } >>> >>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { CachedSchemaRegistryClient client = new CachedSchemaRegistryClient( >>> schemaRegistryUrl, >>> 4); >>> decoder = new KafkaAvroDecoder(client); >>> GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value()); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass)); >>> ByteArrayOutputStream out = new ByteArrayOutputStream(); >>> Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); >>> writer.write(generic, encoder); >>> encoder.flush(); >>> >>> byte[] avroData = out.toByteArray(); >>> out.close(); >>> tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr); >>> SpecificDatumReader<T> reader = new SpecificDatumReader<>( >>> generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass)); >>> Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null); >>> T res = reader.read(null, anotherDecoder); >>> >>> return res; >>> } >>> } >>> >>> >>> /* >>> the specificData class >>> */public class ManagedSpecificData extends SpecificData { private static ManagedSpecificData getManagedSpecificData() { >>> ManagedSpecificData res = new ManagedSpecificData(); >>> >>> registerAdvancedType(new TimestampMillisType(), res); >>> registerAdvancedType(new LocalDateType(), res); >>> >>> return res; >>> }} >>> >>> >>> /* >>> >>> how we use above deser class >>> */ >>> >>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>( >>> PayloadRecord.class, >>> PayloadRecord.getClassSchema().toString(), >>> this.schemaRegistry); >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( >>> this.inputTopic, >>> deserializer, >>> this.sourceSettings); >>> >>> >>> >>> Thanks >>> >>> Lian >>> >>> >>> >>> >>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> >>> wrote: >>> >>>> Hi, >>>> >>>> Could you share exactly how do you configure avro & kafka? Do you use >>>> Table API or DataStream API? Do you use the >>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you >>>> built custom DeserializationSchema? Could you maybe share the code for >>>> instantiating the source with us? It could help us track down the >>>> problematic spot. >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> On 16/09/2020 08:09, Lian Jiang wrote: >>>>> Hi, >>>>> >>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In >>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the >>>>> upstream kafka message. However, I got below error when this message >>>>> is serialized during pushToOperator. Per the stack trace, the reason >>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which >>>>> creates its private copy of specificData. This private specificData >>>>> does not have logical type information. This blocks the deserialized >>>>> messages from being passed to downstream operators. Any idea how to >>>>> make this work? Appreciated very much! >>>>> >>>>> >>>>> org.apache.avro.AvroRuntimeException: Unknown datum type >>>>> java.time.Instant: 2020-09-15T07:00:00Z >>>>> at >>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887) >>>>> at >>>>> >>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>> at >>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850) >>>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >>>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >>>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >>>>> at >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199) >>>>> at >>>>> >>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>> at >>>>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) >>>>> at >>>>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>>>> at >>>>> >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>>>> at >>>>> >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>>>> at >>>>> >>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>>>> at >>>>> >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >>>>> at >>>>> >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >>>>> at >>>>> >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) >>>>> at >>>>> >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) >>>>> at >>>>> >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >>>>> at >>>>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >>>>> at >>>>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> >>>> >>> >>> -- >>> >>> >>> >>> Create your own email signature >>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592> >>> >>> >> >> -- >> >> Create your own email signature >> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592> >> > > |
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven repo), I need to stick to 1.11. Dawid, For the code throwing "java.lang.Long cannot be cast to java.time.Instant", The avro schema has: union {null, timestamp_ms } eventTime = null; The avro pojo does have the logical type conversion: private static SpecificData MODEL$ = new SpecificData(); The pojo code throws: I will send the full avdl and pojo offline to you for a close look. Regards Lian On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <[hidden email]> wrote: Hi All, -- |
Hi Lian, Thank you for sending the full code for the pojo. It clarified a lot! I learnt that Avro introduced yet another mechanism for
retrieving conversions for logical types in Avro 1.9.x. I was not
aware they create a static SpecificData field with registered
logical conversions if a logical type is part of a union. That's
why I did not understand the parts of the you sent me where you
are registering the logical types in the MODEL$ field. The
getConversion method is part of the SpecificRecordBase class and
is being populated by Avro compiler when a logical type is a top
level type. This bit should work just fine. Unfortunately we do not support this "feature" of using the
static SpecificData field. So far we create the SpecificData
manually in AvroSerializer and Avro(De)SerializationSchema that is
why the conversions are not being picked up. I created a JIRA
issue[1] and a PR[2] to support it in Flink 1.12. The only workaround I can see in earlier versions of Flink is to change the AvroSerializer manually. You would need to do a similar thing as I do in the linked PR. Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-19339 [2] https://github.com/apache/flink/pull/13450 On 21/09/2020 19:28, Lian Jiang wrote:
signature.asc (849 bytes) Download Attachment |
Dawid, Thanks for the fix. I may wait for Flink 1.12 coming out at the end of Oct this year. Meanwhile, I may want to better understand the current solution at the beginning of this thread. My observations: 1. ProcessFunction with streamEnv.getConfig().enableObjectReuse() --> working 2. ProcessFunction without streamEnv.getConfig().enableObjectReuse() --> Not working at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224) at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286) at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224) at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:243) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) 3. KeyedProcessFunction with streamEnv.getConfig().enableObjectReuse() --> Not working Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-21T19:52:58.477Z The reason why 1 works is because AvroSerializer is created with the class used by the sink operator. This class does not have any logical types. The reason why 2 and 3 do not work is that AvroSerializer is created with the class used by the source operator FlinkKafkaConsumer. This class has logical types. So the logical type conversion issue is hidden (instead of resolved) in 1. In other words, enableObjectReuse with processFunction will still fail when the class Question: 1. Before Flink 1.12 comes out, how do I apply Dawid's fix? If I need to rebuild Flink, it may not work since our CICD may not support a private Flink build. Also, there are other dependencies coupled with Flink version and it will be hard to manually manage them without downloading from maven repo. 2. What does enableObjectReuse exactly do? Why it causes processFunction's AvroSerializer to use the class in the source operator while keyedProcessFunction's AvroSerializer to use the class in the sink operator? If I can make keyedProcessFunction (or keyed window function) work using enableObjectReuse, this workaround is still valuable for me in the short term. Thanks Lian On Tue, Sep 22, 2020 at 12:22 AM Dawid Wysakowicz <[hidden email]> wrote:
-- |
Hi Lian, sorry for the late reply. 1. All serialization related functions are just implementation of API interfaces. As such, you can implement serializers yourself. In this case, you could simply copy the code from 1.12 into your application. You may adjust a few things that are different between 1.11 and 1.12 though. 2. enableObjectReuse avoids copying of records between chained operators. The chain ends with any keyby. A possible workaround is to enableObjectReuse and then convert the datum with logical types into a datum without logical types if possible (potentially dropping a few fields to reduce network traffic along the way). On Wed, Sep 23, 2020 at 6:11 PM Lian Jiang <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |