Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Lian Jiang
Hi,

i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I can see the FlinkKafkaConsumer already deserialized the upstream kafka message. However, I got below error when this message is serialized during pushToOperator. Per the stack trace, the reason is that AvroSerializer is created by AvroFactory.fromSpecific() which creates its private copy of specificData. This private specificData does not have logical type information. This blocks the deserialized messages from being passed to downstream operators. Any idea how to make this work? Appreciated very much!


org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Piotr Nowojski-4
Hi,


Also maybe as a workaround, is it working if you enable object reuse (`StreamExecutionEnvironment#getConfig()#enableObjectReuse())`)?

Best regards 
Piotrek

śr., 16 wrz 2020 o 08:09 Lian Jiang <[hidden email]> napisał(a):
Hi,

i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I can see the FlinkKafkaConsumer already deserialized the upstream kafka message. However, I got below error when this message is serialized during pushToOperator. Per the stack trace, the reason is that AvroSerializer is created by AvroFactory.fromSpecific() which creates its private copy of specificData. This private specificData does not have logical type information. This blocks the deserialized messages from being passed to downstream operators. Any idea how to make this work? Appreciated very much!


org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Dawid Wysakowicz-2
In reply to this post by Lian Jiang
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:

> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Lian Jiang
Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
this.tClass = tClass;
this.tSchemaStr = tSchemaStr;
this.topic = null;
this.schemaRegistryUrl = schemaRegistryUrl;
}
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
schemaRegistryUrl,
4);

decoder = new KafkaAvroDecoder(client);
 GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(generic, encoder);
encoder.flush();

byte[] avroData = out.toByteArray();
out.close();

tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
SpecificDatumReader<T> reader = new SpecificDatumReader<>(
generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
T res = reader.read(null, anotherDecoder);

return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {
private static ManagedSpecificData getManagedSpecificData() {
ManagedSpecificData res = new ManagedSpecificData();

registerAdvancedType(new TimestampMillisType(), res);
registerAdvancedType(new LocalDateType(), res);

return res;
}}

/*
how we use above deser class
*/
SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
PayloadRecord.class,
PayloadRecord.getClassSchema().toString(),
this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
this.inputTopic,
deserializer,
this.sourceSettings);


Thanks
Lian






On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Dawid Wysakowicz-2

Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema instead of using ConfluentRegistryDeserializationSchema? Your implementation is quite inefficient you do deserialize > serialize > deserialize. Serialization/deserialization is usually one of the heaviest operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:
Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
    this.tClass = tClass;
    this.tSchemaStr = tSchemaStr;
    this.topic = null;
    this.schemaRegistryUrl = schemaRegistryUrl;
}
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
        schemaRegistryUrl,
        4);

    decoder = new KafkaAvroDecoder(client);
    GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(generic, encoder);
    encoder.flush();

    byte[] avroData = out.toByteArray();
    out.close();

    tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
    SpecificDatumReader<T> reader = new SpecificDatumReader<>(
            generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
    Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
    T res = reader.read(null, anotherDecoder);

    return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {  
   private static ManagedSpecificData getManagedSpecificData() {
    ManagedSpecificData res = new ManagedSpecificData();

    registerAdvancedType(new TimestampMillisType(), res);
    registerAdvancedType(new LocalDateType(), res);

    return res;
}}

/*
how we use above deser class
*/
SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
        PayloadRecord.class,
        PayloadRecord.getClassSchema().toString(),
        this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
        this.inputTopic,
        deserializer,
        this.sourceSettings);


Thanks
Lian





On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Lian Jiang
Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean) but got "java.lang.Long cannot be cast to java.time.Instant". This may be caused by https://issues.apache.org/jira/browse/FLINK-11030. Is there any progress for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe<T extends SpecificRecord> implements
KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private String topic; // for serializer
private String subject; // for serializer
private final String schemaRegistryUrl;
private ConfluentRegistryAvroSerializationSchema<T> serializer;
private ConfluentRegistryAvroDeserializationSchema<T> deserializer;

private static final Object lock = new Object();

public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
}

public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
String schemaRegistryUrl,
final String topic,
final String subject) {
return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
}

private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
this.tClass = tClass;
this.schemaRegistryUrl = schemaRegistryUrl;
}

private SpecificRecordSerDe(final Class<T> tClass,
final String schemaRegistryUrl,
final String topic,
final String subject) {
this(tClass, schemaRegistryUrl);
this.topic = topic;
this.subject = subject;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
if (this.serializer == null) {
synchronized (lock) {
if (this.serializer == null) {
this.serializer = ConfluentRegistryAvroSerializationSchema
.forSpecific(tClass, this.subject, this.schemaRegistryUrl);
}
}
}

byte[] bytes = this.serializer.serialize(element);
return new ProducerRecord<>(this.topic, bytes);
}

public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (deserializer == null) {
synchronized (lock) {
if (deserializer == null) {
deserializer = ConfluentRegistryAvroDeserializationSchema
.forSpecific(tClass, this.schemaRegistryUrl);
}
}
}

return deserializer.deserialize(record.value());
}

@Override
public String getTargetTopic(T element) {
return this.topic;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(tClass);
}
}



On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]> wrote:

Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema instead of using ConfluentRegistryDeserializationSchema? Your implementation is quite inefficient you do deserialize > serialize > deserialize. Serialization/deserialization is usually one of the heaviest operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:
Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
    this.tClass = tClass;
    this.tSchemaStr = tSchemaStr;
    this.topic = null;
    this.schemaRegistryUrl = schemaRegistryUrl;
}
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
        schemaRegistryUrl,
        4);

    decoder = new KafkaAvroDecoder(client);
    GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(generic, encoder);
    encoder.flush();

    byte[] avroData = out.toByteArray();
    out.close();

    tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
    SpecificDatumReader<T> reader = new SpecificDatumReader<>(
            generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
    Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
    T res = reader.read(null, anotherDecoder);

    return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {  
   private static ManagedSpecificData getManagedSpecificData() {
    ManagedSpecificData res = new ManagedSpecificData();

    registerAdvancedType(new TimestampMillisType(), res);
    registerAdvancedType(new LocalDateType(), res);

    return res;
}}

/*
how we use above deser class
*/
SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
        PayloadRecord.class,
        PayloadRecord.getClassSchema().toString(),
        this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
        this.inputTopic,
        deserializer,
        this.sourceSettings);


Thanks
Lian

        



On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--


--
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Arvid Heise-3
Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until Hive bumps it [3]. In the thread, I gave some options to avoid running into the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4] if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to bump and rather upgrade independently. Avro was for a long time stuck on 1.8 but the project gained traction again in the past two years. On the other hand, Hive seems to be rather slow to respond to that and we shouldn't have a slow moving component block us to support a fast moving component if it's such apparent that users want it.
[hidden email] could you please pick that topic up and ping the respective maintainers?


On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean) but got "java.lang.Long cannot be cast to java.time.Instant". This may be caused by https://issues.apache.org/jira/browse/FLINK-11030. Is there any progress for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe<T extends SpecificRecord> implements
KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private String topic; // for serializer
private String subject; // for serializer
private final String schemaRegistryUrl;
private ConfluentRegistryAvroSerializationSchema<T> serializer;
private ConfluentRegistryAvroDeserializationSchema<T> deserializer;

private static final Object lock = new Object();

public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
}

public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
String schemaRegistryUrl,
final String topic,
final String subject) {
return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
}

private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
this.tClass = tClass;
this.schemaRegistryUrl = schemaRegistryUrl;
}

private SpecificRecordSerDe(final Class<T> tClass,
final String schemaRegistryUrl,
final String topic,
final String subject) {
this(tClass, schemaRegistryUrl);
this.topic = topic;
this.subject = subject;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
if (this.serializer == null) {
synchronized (lock) {
if (this.serializer == null) {
this.serializer = ConfluentRegistryAvroSerializationSchema
.forSpecific(tClass, this.subject, this.schemaRegistryUrl);
}
}
}

byte[] bytes = this.serializer.serialize(element);
return new ProducerRecord<>(this.topic, bytes);
}

public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (deserializer == null) {
synchronized (lock) {
if (deserializer == null) {
deserializer = ConfluentRegistryAvroDeserializationSchema
.forSpecific(tClass, this.schemaRegistryUrl);
}
}
}

return deserializer.deserialize(record.value());
}

@Override
public String getTargetTopic(T element) {
return this.topic;
}

@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(tClass);
}
}



On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]> wrote:

Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema instead of using ConfluentRegistryDeserializationSchema? Your implementation is quite inefficient you do deserialize > serialize > deserialize. Serialization/deserialization is usually one of the heaviest operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:
Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
    this.tClass = tClass;
    this.tSchemaStr = tSchemaStr;
    this.topic = null;
    this.schemaRegistryUrl = schemaRegistryUrl;
}
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
        schemaRegistryUrl,
        4);

    decoder = new KafkaAvroDecoder(client);
    GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(generic, encoder);
    encoder.flush();

    byte[] avroData = out.toByteArray();
    out.close();

    tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
    SpecificDatumReader<T> reader = new SpecificDatumReader<>(
            generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
    Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
    T res = reader.read(null, anotherDecoder);

    return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {  
   private static ManagedSpecificData getManagedSpecificData() {
    ManagedSpecificData res = new ManagedSpecificData();

    registerAdvancedType(new TimestampMillisType(), res);
    registerAdvancedType(new LocalDateType(), res);

    return res;
}}

/*
how we use above deser class
*/
SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
        PayloadRecord.class,
        PayloadRecord.getClassSchema().toString(),
        this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
        this.inputTopic,
        deserializer,
        this.sourceSettings);


Thanks
Lian

        



On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--


--


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Dawid Wysakowicz-2

Hey Arvid,

Just a quick comment to Arvid's mail for now. It should be safe to update the Avro version even if we've been declaring dependency on Avro 1.8.2 by default. Moreover up until now we do not bundle any version of Avro in any of the uber jars we ship. It is true we used Avro version 1.8.2 by default because that's the version that hadoop ships with (the hadoop distributions really bundle avro dependency as part of their binaries).

As for the other issue, because Hadoop is no longer the most frequent environment Flink is run and as you said they are not the fastest with upgrading dependencies we decided to upgrade the default Avro version that Flink declares. From Flink 1.12 by default we depend on Avro 1.10 It has already been merged into master[1]. Still users should be able to downgrade the avro version if they need. (If they have specific records generated with older versions or they use hadoop.)

@Lian Will look further into the issue. My suspicion though is there is a problem with the Conversions your generated class declares. In order for Flink to handle logical types correctly the generated Avro class must return valid Conversions via SpecificRecord#getConversions(). Could you share the avro schema and the generated class? Without the full picture it will be hard to track down the problem. Again best would be an example that I could run.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-18192

On 21/09/2020 08:04, Arvid Heise wrote:
Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until Hive bumps it [3]. In the thread, I gave some options to avoid running into the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4] if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to bump and rather upgrade independently. Avro was for a long time stuck on 1.8 but the project gained traction again in the past two years. On the other hand, Hive seems to be rather slow to respond to that and we shouldn't have a slow moving component block us to support a fast moving component if it's such apparent that users want it.
[hidden email] could you please pick that topic up and ping the respective maintainers?


On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am trying ConfluentRegistryAvroDeserializationSchema (if this is what you mean) but got "java.lang.Long cannot be cast to java.time.Instant". This may be caused by https://issues.apache.org/jira/browse/FLINK-11030. Is there any progress for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

    private final Class<T> tClass;
    private String topic; // for serializer
    private String subject; // for serializer
    private final String schemaRegistryUrl;
    private ConfluentRegistryAvroSerializationSchema<T> serializer;
    private ConfluentRegistryAvroDeserializationSchema<T> deserializer;

    private static final Object lock = new Object();

    public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
        return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
    }

    public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
                                                        String schemaRegistryUrl,
                                                        final String topic,
                                                        final String subject) {
        return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
    }

    private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
        this.tClass = tClass;
        this.schemaRegistryUrl = schemaRegistryUrl;
    }

    private SpecificRecordSerDe(final Class<T> tClass,
                                final String schemaRegistryUrl,
                                final String topic,
                                final String subject) {
        this(tClass, schemaRegistryUrl);
        this.topic = topic;
        this.subject = subject;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
        if (this.serializer == null) {
            synchronized (lock) {
                if (this.serializer == null) {
                    this.serializer = ConfluentRegistryAvroSerializationSchema
                            .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
                }
            }
        }

        byte[] bytes = this.serializer.serialize(element);
        return new ProducerRecord<>(this.topic, bytes);
    }

    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        if (deserializer == null) {
            synchronized (lock) {
                if (deserializer == null) {
                    deserializer = ConfluentRegistryAvroDeserializationSchema
                            .forSpecific(tClass, this.schemaRegistryUrl);
                }
            }
        }

        return deserializer.deserialize(record.value());
    }

    @Override
    public String getTargetTopic(T element) {
        return this.topic;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(tClass);
    }
}



On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]> wrote:

Thanks for the update.

First of all, why did you decide to build your own DeserializationSchema instead of using ConfluentRegistryDeserializationSchema? Your implementation is quite inefficient you do deserialize > serialize > deserialize. Serialization/deserialization is usually one of the heaviest operations in the pipeline.

What do you return in your getProducedType? From the stack trace I guess you are instantiating the AvroTypeInfo? Could you maybe share a full runnable example? It would make it much easier to help you.

Moreover the pattern with registering custom conversions in a SpecificData will not work with AvroSerializer. Custom serializers should be defined in the generated SpecificRecord (in your case PayloadRecord) in the SpecificRecordBase#getConversion().

Best,

Dawid


On 17/09/2020 16:34, Lian Jiang wrote:
Piotr/Dawid,

Thanks for the reply. FLINK-18223 seems not to related to this issue and I double checked that I am using Flink 1.11.0 instead of 1.10.0. My mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved the issue.

I am not using ConfluentRegistryDeserializationSchema. Instead, I am creating custom DeserializationSchema:


/*
the deser class
*/
public class SpecificRecordSerDe<T extends SpecificRecord> implements
        KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {

private final Class<T> tClass;
private final String tSchemaStr;
private volatile transient Schema tSchema;
private String topic;
private String schemaRegistryUrl;
private KafkaAvroSerializer serializer;
private KafkaAvroDecoder decoder;

public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
    this.tClass = tClass;
    this.tSchemaStr = tSchemaStr;
    this.topic = null;
    this.schemaRegistryUrl = schemaRegistryUrl;
}
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
        schemaRegistryUrl,
        4);

    decoder = new KafkaAvroDecoder(client);
    GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());
    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
    writer.write(generic, encoder);
    encoder.flush();

    byte[] avroData = out.toByteArray();
    out.close();

    tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
    SpecificDatumReader<T> reader = new SpecificDatumReader<>(
            generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
    Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
    T res = reader.read(null, anotherDecoder);

    return res;
}
}


/*
the specificData class
*/
public class ManagedSpecificData extends SpecificData {  
   private static ManagedSpecificData getManagedSpecificData() {
    ManagedSpecificData res = new ManagedSpecificData();

    registerAdvancedType(new TimestampMillisType(), res);
    registerAdvancedType(new LocalDateType(), res);

    return res;
}}

/*
how we use above deser class
*/
SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
        PayloadRecord.class,
        PayloadRecord.getClassSchema().toString(),
        this.schemaRegistry);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
        this.inputTopic,
        deserializer,
        this.sourceSettings);


Thanks
Lian



On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi,

Could you share exactly how do you configure avro & kafka? Do you use
Table API or DataStream API? Do you use the
ConfluentRegistryDeserializationSchema that comes with Flink or did you
built custom DeserializationSchema? Could you maybe share the code for
instantiating the source with us? It could help us track down the
problematic spot.

Best,

Dawid

On 16/09/2020 08:09, Lian Jiang wrote:
> Hi,
>
> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> upstream kafka message. However, I got below error when this message
> is serialized during pushToOperator. Per the stack trace, the reason
> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> creates its private copy of specificData. This private specificData
> does not have logical type information. This blocks the deserialized
> messages from being passed to downstream operators. Any idea how to
> make this work? Appreciated very much!
>
>
> org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-09-15T07:00:00Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)



--


--


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Aljoscha Krettek
In reply to this post by Arvid Heise-3
Hi All,

Avro was finally bumped in
https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see
https://issues.apache.org/jira/browse/FLINK-12532, but it is also
updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:

> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
> Hive bumps it [3]. In the thread, I gave some options to avoid running into
> the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
> if your logical type is nullable (which is not necessary in most cases).
>
> Still, I think it's time for us to revise the decision to wait for Hive to
> bump and rather upgrade independently. Avro was for a long time stuck on
> 1.8 but the project gained traction again in the past two years. On the
> other hand, Hive seems to be rather slow to respond to that and we
> shouldn't have a slow moving component block us to support a fast moving
> component if it's such apparent that users want it.
> @Aljoscha Krettek <[hidden email]> could you please pick that topic up
> and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
>
>> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
>> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
>> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
>> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
>> for this JIRA? Thanks. Regards!
>>
>>
>> Stacktrace:
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> java.time.Instant
>> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>> at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>> at
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Code:
>>
>> import org.apache.avro.specific.SpecificRecord;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import javax.annotation.Nullable;
>> import java.io.Serializable;
>>
>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>
>>      private final Class<T> tClass;
>>      private String topic; // for serializer
>>      private String subject; // for serializer
>>      private final String schemaRegistryUrl;
>>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
>>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>>
>>      private static final Object lock = new Object();
>>
>>      public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>>      }
>>
>>      public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
>>                                                          String schemaRegistryUrl,
>>                                                          final String topic,
>>                                                          final String subject) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
>>          this.tClass = tClass;
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass,
>>                                  final String schemaRegistryUrl,
>>                                  final String topic,
>>                                  final String subject) {
>>          this(tClass, schemaRegistryUrl);
>>          this.topic = topic;
>>          this.subject = subject;
>>      }
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
>>          if (this.serializer == null) {
>>              synchronized (lock) {
>>                  if (this.serializer == null) {
>>                      this.serializer = ConfluentRegistryAvroSerializationSchema
>>                              .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          byte[] bytes = this.serializer.serialize(element);
>>          return new ProducerRecord<>(this.topic, bytes);
>>      }
>>
>>      public boolean isEndOfStream(T nextElement) {
>>          return false;
>>      }
>>
>>      @Override
>>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
>>          if (deserializer == null) {
>>              synchronized (lock) {
>>                  if (deserializer == null) {
>>                      deserializer = ConfluentRegistryAvroDeserializationSchema
>>                              .forSpecific(tClass, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          return deserializer.deserialize(record.value());
>>      }
>>
>>      @Override
>>      public String getTargetTopic(T element) {
>>          return this.topic;
>>      }
>>
>>      @Override
>>      public TypeInformation<T> getProducedType() {
>>          return TypeInformation.of(tClass);
>>      }
>> }
>>
>>
>>
>>
>> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> First of all, why did you decide to build your own DeserializationSchema
>>> instead of using ConfluentRegistryDeserializationSchema? Your
>>> implementation is quite inefficient you do deserialize > serialize >
>>> deserialize. Serialization/deserialization is usually one of the heaviest
>>> operations in the pipeline.
>>>
>>> What do you return in your getProducedType? From the stack trace I guess
>>> you are instantiating the AvroTypeInfo? Could you maybe share a full
>>> runnable example? It would make it much easier to help you.
>>>
>>> Moreover the pattern with registering custom conversions in a
>>> SpecificData will not work with AvroSerializer. Custom serializers should
>>> be defined in the generated SpecificRecord (in your case PayloadRecord) in
>>> the SpecificRecordBase#getConversion().
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> On 17/09/2020 16:34, Lian Jiang wrote:
>>>
>>> Piotr/Dawid,
>>>
>>> Thanks for the reply. FLINK-18223 seems not to related to this issue and
>>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
>>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
>>> the issue.
>>>
>>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
>>> creating custom DeserializationSchema:
>>>
>>>
>>> /*
>>> the deser class
>>> */
>>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>>
>>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder;
>>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
>>>      this.tClass = tClass;
>>>      this.tSchemaStr = tSchemaStr;
>>>      this.topic = null;
>>>      this.schemaRegistryUrl = schemaRegistryUrl;
>>> }
>>>
>>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>>          schemaRegistryUrl,
>>>          4);
>>>      decoder = new KafkaAvroDecoder(client);
>>>      GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
>>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
>>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>>      writer.write(generic, encoder);
>>>      encoder.flush();
>>>
>>>      byte[] avroData = out.toByteArray();
>>>      out.close();
>>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>>              generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
>>>      Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
>>>      T res = reader.read(null, anotherDecoder);
>>>
>>>      return res;
>>> }
>>> }
>>>
>>>
>>> /*
>>> the specificData class
>>> */public class ManagedSpecificData extends SpecificData {     private static ManagedSpecificData getManagedSpecificData() {
>>>      ManagedSpecificData res = new ManagedSpecificData();
>>>
>>>      registerAdvancedType(new TimestampMillisType(), res);
>>>      registerAdvancedType(new LocalDateType(), res);
>>>
>>>      return res;
>>> }}
>>>
>>>
>>> /*
>>>
>>> how we use above deser class
>>> */
>>>
>>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>>>          PayloadRecord.class,
>>>          PayloadRecord.getClassSchema().toString(),
>>>          this.schemaRegistry);
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>>          this.inputTopic,
>>>          deserializer,
>>>          this.sourceSettings);
>>>
>>>
>>>
>>> Thanks
>>>
>>> Lian
>>>
>>>
>>>
>>>
>>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share exactly how do you configure avro & kafka? Do you use
>>>> Table API or DataStream API? Do you use the
>>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>>>> built custom DeserializationSchema? Could you maybe share the code for
>>>> instantiating the source with us? It could help us track down the
>>>> problematic spot.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/09/2020 08:09, Lian Jiang wrote:
>>>>> Hi,
>>>>>
>>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
>>>>> upstream kafka message. However, I got below error when this message
>>>>> is serialized during pushToOperator. Per the stack trace, the reason
>>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
>>>>> creates its private copy of specificData. This private specificData
>>>>> does not have logical type information. This blocks the deserialized
>>>>> messages from being passed to downstream operators. Any idea how to
>>>>> make this work? Appreciated very much!
>>>>>
>>>>>
>>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-09-15T07:00:00Z
>>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>>>> at
>>>>>
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>>>
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>
>>>>
>>>
>>> --
>>>
>>>
>>>
>>> Create your own email signature
>>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Lian Jiang
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:
  private static SpecificData MODEL$ = new SpecificData();
static {
MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion());
}

I don't see SpecificRecord#getConversions() you mentioned in avro repo.
The pojo code throws:
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 3: eventTime = (java.time.Instant)value$; break; // throw here
}

I will send the full avdl and pojo offline to you for a close look.


Regards
Lian



On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <[hidden email]> wrote:
Hi All,

Avro was finally bumped in
https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see
https://issues.apache.org/jira/browse/FLINK-12532, but it is also
updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
> Hive bumps it [3]. In the thread, I gave some options to avoid running into
> the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
> if your logical type is nullable (which is not necessary in most cases).
>
> Still, I think it's time for us to revise the decision to wait for Hive to
> bump and rather upgrade independently. Avro was for a long time stuck on
> 1.8 but the project gained traction again in the past two years. On the
> other hand, Hive seems to be rather slow to respond to that and we
> shouldn't have a slow moving component block us to support a fast moving
> component if it's such apparent that users want it.
> @Aljoscha Krettek <[hidden email]> could you please pick that topic up
> and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
>
>> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
>> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
>> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
>> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
>> for this JIRA? Thanks. Regards!
>>
>>
>> Stacktrace:
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> java.time.Instant
>> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>> at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>> at
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Code:
>>
>> import org.apache.avro.specific.SpecificRecord;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import javax.annotation.Nullable;
>> import java.io.Serializable;
>>
>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>
>>      private final Class<T> tClass;
>>      private String topic; // for serializer
>>      private String subject; // for serializer
>>      private final String schemaRegistryUrl;
>>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
>>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>>
>>      private static final Object lock = new Object();
>>
>>      public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>>      }
>>
>>      public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
>>                                                          String schemaRegistryUrl,
>>                                                          final String topic,
>>                                                          final String subject) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
>>          this.tClass = tClass;
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass,
>>                                  final String schemaRegistryUrl,
>>                                  final String topic,
>>                                  final String subject) {
>>          this(tClass, schemaRegistryUrl);
>>          this.topic = topic;
>>          this.subject = subject;
>>      }
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
>>          if (this.serializer == null) {
>>              synchronized (lock) {
>>                  if (this.serializer == null) {
>>                      this.serializer = ConfluentRegistryAvroSerializationSchema
>>                              .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          byte[] bytes = this.serializer.serialize(element);
>>          return new ProducerRecord<>(this.topic, bytes);
>>      }
>>
>>      public boolean isEndOfStream(T nextElement) {
>>          return false;
>>      }
>>
>>      @Override
>>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
>>          if (deserializer == null) {
>>              synchronized (lock) {
>>                  if (deserializer == null) {
>>                      deserializer = ConfluentRegistryAvroDeserializationSchema
>>                              .forSpecific(tClass, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          return deserializer.deserialize(record.value());
>>      }
>>
>>      @Override
>>      public String getTargetTopic(T element) {
>>          return this.topic;
>>      }
>>
>>      @Override
>>      public TypeInformation<T> getProducedType() {
>>          return TypeInformation.of(tClass);
>>      }
>> }
>>
>>
>>
>>
>> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> First of all, why did you decide to build your own DeserializationSchema
>>> instead of using ConfluentRegistryDeserializationSchema? Your
>>> implementation is quite inefficient you do deserialize > serialize >
>>> deserialize. Serialization/deserialization is usually one of the heaviest
>>> operations in the pipeline.
>>>
>>> What do you return in your getProducedType? From the stack trace I guess
>>> you are instantiating the AvroTypeInfo? Could you maybe share a full
>>> runnable example? It would make it much easier to help you.
>>>
>>> Moreover the pattern with registering custom conversions in a
>>> SpecificData will not work with AvroSerializer. Custom serializers should
>>> be defined in the generated SpecificRecord (in your case PayloadRecord) in
>>> the SpecificRecordBase#getConversion().
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> On 17/09/2020 16:34, Lian Jiang wrote:
>>>
>>> Piotr/Dawid,
>>>
>>> Thanks for the reply. FLINK-18223 seems not to related to this issue and
>>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
>>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
>>> the issue.
>>>
>>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
>>> creating custom DeserializationSchema:
>>>
>>>
>>> /*
>>> the deser class
>>> */
>>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>>
>>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder;
>>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
>>>      this.tClass = tClass;
>>>      this.tSchemaStr = tSchemaStr;
>>>      this.topic = null;
>>>      this.schemaRegistryUrl = schemaRegistryUrl;
>>> }
>>>
>>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>>          schemaRegistryUrl,
>>>          4);
>>>      decoder = new KafkaAvroDecoder(client);
>>>      GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
>>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
>>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>>      writer.write(generic, encoder);
>>>      encoder.flush();
>>>
>>>      byte[] avroData = out.toByteArray();
>>>      out.close();
>>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>>              generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
>>>      Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
>>>      T res = reader.read(null, anotherDecoder);
>>>
>>>      return res;
>>> }
>>> }
>>>
>>>
>>> /*
>>> the specificData class
>>> */public class ManagedSpecificData extends SpecificData {     private static ManagedSpecificData getManagedSpecificData() {
>>>      ManagedSpecificData res = new ManagedSpecificData();
>>>
>>>      registerAdvancedType(new TimestampMillisType(), res);
>>>      registerAdvancedType(new LocalDateType(), res);
>>>
>>>      return res;
>>> }}
>>>
>>>
>>> /*
>>>
>>> how we use above deser class
>>> */
>>>
>>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>>>          PayloadRecord.class,
>>>          PayloadRecord.getClassSchema().toString(),
>>>          this.schemaRegistry);
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>>          this.inputTopic,
>>>          deserializer,
>>>          this.sourceSettings);
>>>
>>>
>>>
>>> Thanks
>>>
>>> Lian
>>>
>>>
>>>
>>>
>>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share exactly how do you configure avro & kafka? Do you use
>>>> Table API or DataStream API? Do you use the
>>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>>>> built custom DeserializationSchema? Could you maybe share the code for
>>>> instantiating the source with us? It could help us track down the
>>>> problematic spot.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/09/2020 08:09, Lian Jiang wrote:
>>>>> Hi,
>>>>>
>>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
>>>>> upstream kafka message. However, I got below error when this message
>>>>> is serialized during pushToOperator. Per the stack trace, the reason
>>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
>>>>> creates its private copy of specificData. This private specificData
>>>>> does not have logical type information. This blocks the deserialized
>>>>> messages from being passed to downstream operators. Any idea how to
>>>>> make this work? Appreciated very much!
>>>>>
>>>>>
>>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-09-15T07:00:00Z
>>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>>>> at
>>>>>
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>>>
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>
>>>>
>>>
>>> --
>>>
>>>
>>>
>>> Create your own email signature
>>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>



--
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Dawid Wysakowicz-2

Hi Lian,

Thank you for sending the full code for the pojo. It clarified a lot!

I learnt that Avro introduced yet another mechanism for retrieving conversions for logical types in Avro 1.9.x. I was not aware they create a static SpecificData field with registered logical conversions if a logical type is part of a union. That's why I did not understand the parts of the you sent me where you are registering the logical types in the MODEL$ field. The getConversion method is part of the SpecificRecordBase class and is being populated by Avro compiler when a logical type is a top level type. This bit should work just fine.

Unfortunately we do not support this "feature" of using the static SpecificData field. So far we create the SpecificData manually in AvroSerializer and Avro(De)SerializationSchema that is why the conversions are not being picked up. I created a JIRA issue[1] and a PR[2] to support it in Flink 1.12.

The only workaround I can see in earlier versions of Flink is to change the AvroSerializer manually. You would need to do a similar thing as I do in the linked PR.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19339

[2] https://github.com/apache/flink/pull/13450

On 21/09/2020 19:28, Lian Jiang wrote:
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:
  private static SpecificData MODEL$ = new SpecificData();
static {
    MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion());
  }

I don't see SpecificRecord#getConversions() you mentioned in avro repo.
The pojo code throws:
public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 3: eventTime = (java.time.Instant)value$; break; // throw here
  }

I will send the full avdl and pojo offline to you for a close look.


Regards
Lian


On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <[hidden email]> wrote:
Hi All,

Avro was finally bumped in
https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see
https://issues.apache.org/jira/browse/FLINK-12532, but it is also
updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
> Hive bumps it [3]. In the thread, I gave some options to avoid running into
> the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
> if your logical type is nullable (which is not necessary in most cases).
>
> Still, I think it's time for us to revise the decision to wait for Hive to
> bump and rather upgrade independently. Avro was for a long time stuck on
> 1.8 but the project gained traction again in the past two years. On the
> other hand, Hive seems to be rather slow to respond to that and we
> shouldn't have a slow moving component block us to support a fast moving
> component if it's such apparent that users want it.
> @Aljoscha Krettek <[hidden email]> could you please pick that topic up
> and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
>
>> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
>> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
>> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
>> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
>> for this JIRA? Thanks. Regards!
>>
>>
>> Stacktrace:
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> java.time.Instant
>> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>> at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>> at
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Code:
>>
>> import org.apache.avro.specific.SpecificRecord;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import javax.annotation.Nullable;
>> import java.io.Serializable;
>>
>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>
>>      private final Class<T> tClass;
>>      private String topic; // for serializer
>>      private String subject; // for serializer
>>      private final String schemaRegistryUrl;
>>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
>>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>>
>>      private static final Object lock = new Object();
>>
>>      public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>>      }
>>
>>      public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
>>                                                          String schemaRegistryUrl,
>>                                                          final String topic,
>>                                                          final String subject) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
>>          this.tClass = tClass;
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass,
>>                                  final String schemaRegistryUrl,
>>                                  final String topic,
>>                                  final String subject) {
>>          this(tClass, schemaRegistryUrl);
>>          this.topic = topic;
>>          this.subject = subject;
>>      }
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
>>          if (this.serializer == null) {
>>              synchronized (lock) {
>>                  if (this.serializer == null) {
>>                      this.serializer = ConfluentRegistryAvroSerializationSchema
>>                              .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          byte[] bytes = this.serializer.serialize(element);
>>          return new ProducerRecord<>(this.topic, bytes);
>>      }
>>
>>      public boolean isEndOfStream(T nextElement) {
>>          return false;
>>      }
>>
>>      @Override
>>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
>>          if (deserializer == null) {
>>              synchronized (lock) {
>>                  if (deserializer == null) {
>>                      deserializer = ConfluentRegistryAvroDeserializationSchema
>>                              .forSpecific(tClass, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          return deserializer.deserialize(record.value());
>>      }
>>
>>      @Override
>>      public String getTargetTopic(T element) {
>>          return this.topic;
>>      }
>>
>>      @Override
>>      public TypeInformation<T> getProducedType() {
>>          return TypeInformation.of(tClass);
>>      }
>> }
>>
>>
>>
>>
>> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> First of all, why did you decide to build your own DeserializationSchema
>>> instead of using ConfluentRegistryDeserializationSchema? Your
>>> implementation is quite inefficient you do deserialize > serialize >
>>> deserialize. Serialization/deserialization is usually one of the heaviest
>>> operations in the pipeline.
>>>
>>> What do you return in your getProducedType? From the stack trace I guess
>>> you are instantiating the AvroTypeInfo? Could you maybe share a full
>>> runnable example? It would make it much easier to help you.
>>>
>>> Moreover the pattern with registering custom conversions in a
>>> SpecificData will not work with AvroSerializer. Custom serializers should
>>> be defined in the generated SpecificRecord (in your case PayloadRecord) in
>>> the SpecificRecordBase#getConversion().
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> On 17/09/2020 16:34, Lian Jiang wrote:
>>>
>>> Piotr/Dawid,
>>>
>>> Thanks for the reply. FLINK-18223 seems not to related to this issue and
>>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
>>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
>>> the issue.
>>>
>>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
>>> creating custom DeserializationSchema:
>>>
>>>
>>> /*
>>> the deser class
>>> */
>>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>>
>>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder;
>>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
>>>      this.tClass = tClass;
>>>      this.tSchemaStr = tSchemaStr;
>>>      this.topic = null;
>>>      this.schemaRegistryUrl = schemaRegistryUrl;
>>> }
>>>
>>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>>          schemaRegistryUrl,
>>>          4);
>>>      decoder = new KafkaAvroDecoder(client);
>>>      GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
>>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
>>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>>      writer.write(generic, encoder);
>>>      encoder.flush();
>>>
>>>      byte[] avroData = out.toByteArray();
>>>      out.close();
>>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>>              generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
>>>      Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
>>>      T res = reader.read(null, anotherDecoder);
>>>
>>>      return res;
>>> }
>>> }
>>>
>>>
>>> /*
>>> the specificData class
>>> */public class ManagedSpecificData extends SpecificData {     private static ManagedSpecificData getManagedSpecificData() {
>>>      ManagedSpecificData res = new ManagedSpecificData();
>>>
>>>      registerAdvancedType(new TimestampMillisType(), res);
>>>      registerAdvancedType(new LocalDateType(), res);
>>>
>>>      return res;
>>> }}
>>>
>>>
>>> /*
>>>
>>> how we use above deser class
>>> */
>>>
>>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>>>          PayloadRecord.class,
>>>          PayloadRecord.getClassSchema().toString(),
>>>          this.schemaRegistry);
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>>          this.inputTopic,
>>>          deserializer,
>>>          this.sourceSettings);
>>>
>>>
>>>
>>> Thanks
>>>
>>> Lian
>>>
>>>
>>>
>>>
>>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share exactly how do you configure avro & kafka? Do you use
>>>> Table API or DataStream API? Do you use the
>>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>>>> built custom DeserializationSchema? Could you maybe share the code for
>>>> instantiating the source with us? It could help us track down the
>>>> problematic spot.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/09/2020 08:09, Lian Jiang wrote:
>>>>> Hi,
>>>>>
>>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
>>>>> upstream kafka message. However, I got below error when this message
>>>>> is serialized during pushToOperator. Per the stack trace, the reason
>>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
>>>>> creates its private copy of specificData. This private specificData
>>>>> does not have logical type information. This blocks the deserialized
>>>>> messages from being passed to downstream operators. Any idea how to
>>>>> make this work? Appreciated very much!
>>>>>
>>>>>
>>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-09-15T07:00:00Z
>>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>>>> at
>>>>>
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>>>
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>
>>>>
>>>
>>> --
>>>
>>>
>>>
>>> Create your own email signature
>>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>



--

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Lian Jiang
Dawid,

Thanks for the fix. I may wait for Flink 1.12 coming out at the end of Oct this year. Meanwhile, I may want to better understand the current solution at the beginning of this thread.

My observations:

1. ProcessFunction with streamEnv.getConfig().enableObjectReuse() --> working

2. ProcessFunction without streamEnv.getConfig().enableObjectReuse() --> Not working

Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-21T18:54:06.216Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:243)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



3. KeyedProcessFunction with streamEnv.getConfig().enableObjectReuse() --> Not working
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-21T19:52:58.477Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:272)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:143)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 13 more


The reason why 1 works is because AvroSerializer is created with the class used by the sink operator. This class does not have
any logical types. The reason why 2 and 3 do not work is that AvroSerializer is created with the class used by
the source operator FlinkKafkaConsumer. This class has logical types. So the logical type conversion issue is
hidden (instead of resolved) in 1. In other words, enableObjectReuse with processFunction will still fail when the class 
in the sink operator has logical types.

Question:
1. Before Flink 1.12 comes out, how do I apply Dawid's fix? If I need to rebuild Flink, it may not work since our 
CICD may not support a private Flink build. Also, there are other dependencies coupled with Flink version and
it will be hard to manually manage them without downloading from maven repo.

2. What does enableObjectReuse exactly do? Why it causes processFunction's AvroSerializer to use the class in the source
operator while keyedProcessFunction's AvroSerializer to use the class in the sink operator? If I can make keyedProcessFunction
(or keyed window function) work using enableObjectReuse, this workaround is still valuable for me in the short term.


Thanks
Lian


 













On Tue, Sep 22, 2020 at 12:22 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Lian,

Thank you for sending the full code for the pojo. It clarified a lot!

I learnt that Avro introduced yet another mechanism for retrieving conversions for logical types in Avro 1.9.x. I was not aware they create a static SpecificData field with registered logical conversions if a logical type is part of a union. That's why I did not understand the parts of the you sent me where you are registering the logical types in the MODEL$ field. The getConversion method is part of the SpecificRecordBase class and is being populated by Avro compiler when a logical type is a top level type. This bit should work just fine.

Unfortunately we do not support this "feature" of using the static SpecificData field. So far we create the SpecificData manually in AvroSerializer and Avro(De)SerializationSchema that is why the conversions are not being picked up. I created a JIRA issue[1] and a PR[2] to support it in Flink 1.12.

The only workaround I can see in earlier versions of Flink is to change the AvroSerializer manually. You would need to do a similar thing as I do in the linked PR.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19339

[2] https://github.com/apache/flink/pull/13450

On 21/09/2020 19:28, Lian Jiang wrote:
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:
  private static SpecificData MODEL$ = new SpecificData();
static {
    MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion());
  }

I don't see SpecificRecord#getConversions() you mentioned in avro repo.
The pojo code throws:
public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 3: eventTime = (java.time.Instant)value$; break; // throw here
  }

I will send the full avdl and pojo offline to you for a close look.


Regards
Lian

        

On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <[hidden email]> wrote:
Hi All,

Avro was finally bumped in
https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see
https://issues.apache.org/jira/browse/FLINK-12532, but it is also
updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
> Hive bumps it [3]. In the thread, I gave some options to avoid running into
> the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
> if your logical type is nullable (which is not necessary in most cases).
>
> Still, I think it's time for us to revise the decision to wait for Hive to
> bump and rather upgrade independently. Avro was for a long time stuck on
> 1.8 but the project gained traction again in the past two years. On the
> other hand, Hive seems to be rather slow to respond to that and we
> shouldn't have a slow moving component block us to support a fast moving
> component if it's such apparent that users want it.
> @Aljoscha Krettek <[hidden email]> could you please pick that topic up
> and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
>
>> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
>> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
>> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
>> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
>> for this JIRA? Thanks. Regards!
>>
>>
>> Stacktrace:
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> java.time.Instant
>> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>> at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>> at
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Code:
>>
>> import org.apache.avro.specific.SpecificRecord;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import javax.annotation.Nullable;
>> import java.io.Serializable;
>>
>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>
>>      private final Class<T> tClass;
>>      private String topic; // for serializer
>>      private String subject; // for serializer
>>      private final String schemaRegistryUrl;
>>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
>>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>>
>>      private static final Object lock = new Object();
>>
>>      public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>>      }
>>
>>      public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
>>                                                          String schemaRegistryUrl,
>>                                                          final String topic,
>>                                                          final String subject) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
>>          this.tClass = tClass;
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass,
>>                                  final String schemaRegistryUrl,
>>                                  final String topic,
>>                                  final String subject) {
>>          this(tClass, schemaRegistryUrl);
>>          this.topic = topic;
>>          this.subject = subject;
>>      }
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
>>          if (this.serializer == null) {
>>              synchronized (lock) {
>>                  if (this.serializer == null) {
>>                      this.serializer = ConfluentRegistryAvroSerializationSchema
>>                              .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          byte[] bytes = this.serializer.serialize(element);
>>          return new ProducerRecord<>(this.topic, bytes);
>>      }
>>
>>      public boolean isEndOfStream(T nextElement) {
>>          return false;
>>      }
>>
>>      @Override
>>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
>>          if (deserializer == null) {
>>              synchronized (lock) {
>>                  if (deserializer == null) {
>>                      deserializer = ConfluentRegistryAvroDeserializationSchema
>>                              .forSpecific(tClass, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          return deserializer.deserialize(record.value());
>>      }
>>
>>      @Override
>>      public String getTargetTopic(T element) {
>>          return this.topic;
>>      }
>>
>>      @Override
>>      public TypeInformation<T> getProducedType() {
>>          return TypeInformation.of(tClass);
>>      }
>> }
>>
>>
>>
>>
>> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> First of all, why did you decide to build your own DeserializationSchema
>>> instead of using ConfluentRegistryDeserializationSchema? Your
>>> implementation is quite inefficient you do deserialize > serialize >
>>> deserialize. Serialization/deserialization is usually one of the heaviest
>>> operations in the pipeline.
>>>
>>> What do you return in your getProducedType? From the stack trace I guess
>>> you are instantiating the AvroTypeInfo? Could you maybe share a full
>>> runnable example? It would make it much easier to help you.
>>>
>>> Moreover the pattern with registering custom conversions in a
>>> SpecificData will not work with AvroSerializer. Custom serializers should
>>> be defined in the generated SpecificRecord (in your case PayloadRecord) in
>>> the SpecificRecordBase#getConversion().
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> On 17/09/2020 16:34, Lian Jiang wrote:
>>>
>>> Piotr/Dawid,
>>>
>>> Thanks for the reply. FLINK-18223 seems not to related to this issue and
>>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
>>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
>>> the issue.
>>>
>>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
>>> creating custom DeserializationSchema:
>>>
>>>
>>> /*
>>> the deser class
>>> */
>>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>>
>>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder;
>>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
>>>      this.tClass = tClass;
>>>      this.tSchemaStr = tSchemaStr;
>>>      this.topic = null;
>>>      this.schemaRegistryUrl = schemaRegistryUrl;
>>> }
>>>
>>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>>          schemaRegistryUrl,
>>>          4);
>>>      decoder = new KafkaAvroDecoder(client);
>>>      GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
>>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
>>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>>      writer.write(generic, encoder);
>>>      encoder.flush();
>>>
>>>      byte[] avroData = out.toByteArray();
>>>      out.close();
>>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>>              generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
>>>      Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
>>>      T res = reader.read(null, anotherDecoder);
>>>
>>>      return res;
>>> }
>>> }
>>>
>>>
>>> /*
>>> the specificData class
>>> */public class ManagedSpecificData extends SpecificData {     private static ManagedSpecificData getManagedSpecificData() {
>>>      ManagedSpecificData res = new ManagedSpecificData();
>>>
>>>      registerAdvancedType(new TimestampMillisType(), res);
>>>      registerAdvancedType(new LocalDateType(), res);
>>>
>>>      return res;
>>> }}
>>>
>>>
>>> /*
>>>
>>> how we use above deser class
>>> */
>>>
>>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>>>          PayloadRecord.class,
>>>          PayloadRecord.getClassSchema().toString(),
>>>          this.schemaRegistry);
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>>          this.inputTopic,
>>>          deserializer,
>>>          this.sourceSettings);
>>>
>>>
>>>
>>> Thanks
>>>
>>> Lian
>>>
>>>
>>>
>>>
>>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share exactly how do you configure avro & kafka? Do you use
>>>> Table API or DataStream API? Do you use the
>>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>>>> built custom DeserializationSchema? Could you maybe share the code for
>>>> instantiating the source with us? It could help us track down the
>>>> problematic spot.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/09/2020 08:09, Lian Jiang wrote:
>>>>> Hi,
>>>>>
>>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
>>>>> upstream kafka message. However, I got below error when this message
>>>>> is serialized during pushToOperator. Per the stack trace, the reason
>>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
>>>>> creates its private copy of specificData. This private specificData
>>>>> does not have logical type information. This blocks the deserialized
>>>>> messages from being passed to downstream operators. Any idea how to
>>>>> make this work? Appreciated very much!
>>>>>
>>>>>
>>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-09-15T07:00:00Z
>>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>>>> at
>>>>>
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>>>
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>
>>>>
>>>
>>> --
>>>
>>>
>>>
>>> Create your own email signature
>>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>



--


--
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Arvid Heise-3
Hi Lian,

sorry for the late reply.

1. All serialization related functions are just implementation of API interfaces. As such, you can implement serializers yourself. In this case, you could simply copy the code from 1.12 into your application. You may adjust a few things that are different between 1.11 and 1.12 though.
2. enableObjectReuse avoids copying of records between chained operators. The chain ends with any keyby.
A possible workaround is to enableObjectReuse and then convert the datum with logical types into a datum without logical types if possible (potentially dropping a few fields to reduce network traffic along the way).

On Wed, Sep 23, 2020 at 6:11 PM Lian Jiang <[hidden email]> wrote:
Dawid,

Thanks for the fix. I may wait for Flink 1.12 coming out at the end of Oct this year. Meanwhile, I may want to better understand the current solution at the beginning of this thread.

My observations:

1. ProcessFunction with streamEnv.getConfig().enableObjectReuse() --> working

2. ProcessFunction without streamEnv.getConfig().enableObjectReuse() --> Not working

Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-21T18:54:06.216Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:243)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



3. KeyedProcessFunction with streamEnv.getConfig().enableObjectReuse() --> Not working
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2020-09-21T19:52:58.477Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:272)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:143)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 13 more


The reason why 1 works is because AvroSerializer is created with the class used by the sink operator. This class does not have
any logical types. The reason why 2 and 3 do not work is that AvroSerializer is created with the class used by
the source operator FlinkKafkaConsumer. This class has logical types. So the logical type conversion issue is
hidden (instead of resolved) in 1. In other words, enableObjectReuse with processFunction will still fail when the class 
in the sink operator has logical types.

Question:
1. Before Flink 1.12 comes out, how do I apply Dawid's fix? If I need to rebuild Flink, it may not work since our 
CICD may not support a private Flink build. Also, there are other dependencies coupled with Flink version and
it will be hard to manually manage them without downloading from maven repo.

2. What does enableObjectReuse exactly do? Why it causes processFunction's AvroSerializer to use the class in the source
operator while keyedProcessFunction's AvroSerializer to use the class in the sink operator? If I can make keyedProcessFunction
(or keyed window function) work using enableObjectReuse, this workaround is still valuable for me in the short term.


Thanks
Lian


 













On Tue, Sep 22, 2020 at 12:22 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Lian,

Thank you for sending the full code for the pojo. It clarified a lot!

I learnt that Avro introduced yet another mechanism for retrieving conversions for logical types in Avro 1.9.x. I was not aware they create a static SpecificData field with registered logical conversions if a logical type is part of a union. That's why I did not understand the parts of the you sent me where you are registering the logical types in the MODEL$ field. The getConversion method is part of the SpecificRecordBase class and is being populated by Avro compiler when a logical type is a top level type. This bit should work just fine.

Unfortunately we do not support this "feature" of using the static SpecificData field. So far we create the SpecificData manually in AvroSerializer and Avro(De)SerializationSchema that is why the conversions are not being picked up. I created a JIRA issue[1] and a PR[2] to support it in Flink 1.12.

The only workaround I can see in earlier versions of Flink is to change the AvroSerializer manually. You would need to do a similar thing as I do in the linked PR.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19339

[2] https://github.com/apache/flink/pull/13450

On 21/09/2020 19:28, Lian Jiang wrote:
Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:
  private static SpecificData MODEL$ = new SpecificData();
static {
    MODEL$.addLogicalTypeConversion(new org.apache.avro.data.TimeConversions.TimestampMillisConversion());
  }

I don't see SpecificRecord#getConversions() you mentioned in avro repo.
The pojo code throws:
public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 3: eventTime = (java.time.Instant)value$; break; // throw here
  }

I will send the full avdl and pojo offline to you for a close look.


Regards
Lian

        

On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <[hidden email]> wrote:
Hi All,

Avro was finally bumped in
https://issues.apache.org/jira/browse/FLINK-18192.

The implementers didn't see
https://issues.apache.org/jira/browse/FLINK-12532, but it is also
updated now.

Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
> Hive bumps it [3]. In the thread, I gave some options to avoid running into
> the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
> if your logical type is nullable (which is not necessary in most cases).
>
> Still, I think it's time for us to revise the decision to wait for Hive to
> bump and rather upgrade independently. Avro was for a long time stuck on
> 1.8 but the project gained traction again in the past two years. On the
> other hand, Hive seems to be rather slow to respond to that and we
> shouldn't have a slow moving component block us to support a fast moving
> component if it's such apparent that users want it.
> @Aljoscha Krettek <[hidden email]> could you please pick that topic up
> and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <[hidden email]> wrote:
>
>> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
>> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
>> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
>> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
>> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
>> for this JIRA? Thanks. Regards!
>>
>>
>> Stacktrace:
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> java.time.Instant
>> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>> at
>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>> at
>> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>> at
>> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>>
>>
>>
>> Code:
>>
>> import org.apache.avro.specific.SpecificRecord;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>> import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>> import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>> import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import javax.annotation.Nullable;
>> import java.io.Serializable;
>>
>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>
>>      private final Class<T> tClass;
>>      private String topic; // for serializer
>>      private String subject; // for serializer
>>      private final String schemaRegistryUrl;
>>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
>>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>>
>>      private static final Object lock = new Object();
>>
>>      public static <T> SpecificRecordSerDe forDeserializer(final Class<T> tClass, String schemaRegistryUrl) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>>      }
>>
>>      public static <T> SpecificRecordSerDe forSerializer(final Class<T> tClass,
>>                                                          String schemaRegistryUrl,
>>                                                          final String topic,
>>                                                          final String subject) {
>>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, subject);
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass, String schemaRegistryUrl) {
>>          this.tClass = tClass;
>>          this.schemaRegistryUrl = schemaRegistryUrl;
>>      }
>>
>>      private SpecificRecordSerDe(final Class<T> tClass,
>>                                  final String schemaRegistryUrl,
>>                                  final String topic,
>>                                  final String subject) {
>>          this(tClass, schemaRegistryUrl);
>>          this.topic = topic;
>>          this.subject = subject;
>>      }
>>
>>      @Override
>>      public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
>>          if (this.serializer == null) {
>>              synchronized (lock) {
>>                  if (this.serializer == null) {
>>                      this.serializer = ConfluentRegistryAvroSerializationSchema
>>                              .forSpecific(tClass, this.subject, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          byte[] bytes = this.serializer.serialize(element);
>>          return new ProducerRecord<>(this.topic, bytes);
>>      }
>>
>>      public boolean isEndOfStream(T nextElement) {
>>          return false;
>>      }
>>
>>      @Override
>>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
>>          if (deserializer == null) {
>>              synchronized (lock) {
>>                  if (deserializer == null) {
>>                      deserializer = ConfluentRegistryAvroDeserializationSchema
>>                              .forSpecific(tClass, this.schemaRegistryUrl);
>>                  }
>>              }
>>          }
>>
>>          return deserializer.deserialize(record.value());
>>      }
>>
>>      @Override
>>      public String getTargetTopic(T element) {
>>          return this.topic;
>>      }
>>
>>      @Override
>>      public TypeInformation<T> getProducedType() {
>>          return TypeInformation.of(tClass);
>>      }
>> }
>>
>>
>>
>>
>> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>>> Thanks for the update.
>>>
>>> First of all, why did you decide to build your own DeserializationSchema
>>> instead of using ConfluentRegistryDeserializationSchema? Your
>>> implementation is quite inefficient you do deserialize > serialize >
>>> deserialize. Serialization/deserialization is usually one of the heaviest
>>> operations in the pipeline.
>>>
>>> What do you return in your getProducedType? From the stack trace I guess
>>> you are instantiating the AvroTypeInfo? Could you maybe share a full
>>> runnable example? It would make it much easier to help you.
>>>
>>> Moreover the pattern with registering custom conversions in a
>>> SpecificData will not work with AvroSerializer. Custom serializers should
>>> be defined in the generated SpecificRecord (in your case PayloadRecord) in
>>> the SpecificRecordBase#getConversion().
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> On 17/09/2020 16:34, Lian Jiang wrote:
>>>
>>> Piotr/Dawid,
>>>
>>> Thanks for the reply. FLINK-18223 seems not to related to this issue and
>>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
>>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse()) solved
>>> the issue.
>>>
>>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
>>> creating custom DeserializationSchema:
>>>
>>>
>>> /*
>>> the deser class
>>> */
>>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
>>>
>>>          KafkaSerializationSchema<T>, KafkaContextAware<T>, KafkaDeserializationSchema<T>, Serializable {
>>> private final Class<T> tClass;private final String tSchemaStr;private volatile transient Schema tSchema;private String topic;private String schemaRegistryUrl;private KafkaAvroSerializer serializer;private KafkaAvroDecoder decoder;
>>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr, String schemaRegistryUrl) {
>>>      this.tClass = tClass;
>>>      this.tSchemaStr = tSchemaStr;
>>>      this.topic = null;
>>>      this.schemaRegistryUrl = schemaRegistryUrl;
>>> }
>>>
>>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {    CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>>          schemaRegistryUrl,
>>>          4);
>>>      decoder = new KafkaAvroDecoder(client);
>>>      GenericRecord generic = (GenericRecord) decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(generic.getSchema(), ManagedSpecificData.getForClass(tClass));
>>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
>>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>>      writer.write(generic, encoder);
>>>      encoder.flush();
>>>
>>>      byte[] avroData = out.toByteArray();
>>>      out.close();
>>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>>              generic.getSchema(), tSchema, ManagedSpecificData.getForClass(tClass));
>>>      Decoder anotherDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
>>>      T res = reader.read(null, anotherDecoder);
>>>
>>>      return res;
>>> }
>>> }
>>>
>>>
>>> /*
>>> the specificData class
>>> */public class ManagedSpecificData extends SpecificData {     private static ManagedSpecificData getManagedSpecificData() {
>>>      ManagedSpecificData res = new ManagedSpecificData();
>>>
>>>      registerAdvancedType(new TimestampMillisType(), res);
>>>      registerAdvancedType(new LocalDateType(), res);
>>>
>>>      return res;
>>> }}
>>>
>>>
>>> /*
>>>
>>> how we use above deser class
>>> */
>>>
>>> SpecificRecordSerDe<PayloadRecord> deserializer = new SpecificRecordSerDe<>(
>>>          PayloadRecord.class,
>>>          PayloadRecord.getClassSchema().toString(),
>>>          this.schemaRegistry);
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>>          this.inputTopic,
>>>          deserializer,
>>>          this.sourceSettings);
>>>
>>>
>>>
>>> Thanks
>>>
>>> Lian
>>>
>>>
>>>
>>>
>>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share exactly how do you configure avro & kafka? Do you use
>>>> Table API or DataStream API? Do you use the
>>>> ConfluentRegistryDeserializationSchema that comes with Flink or did you
>>>> built custom DeserializationSchema? Could you maybe share the code for
>>>> instantiating the source with us? It could help us track down the
>>>> problematic spot.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 16/09/2020 08:09, Lian Jiang wrote:
>>>>> Hi,
>>>>>
>>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
>>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
>>>>> upstream kafka message. However, I got below error when this message
>>>>> is serialized during pushToOperator. Per the stack trace, the reason
>>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
>>>>> creates its private copy of specificData. This private specificData
>>>>> does not have logical type information. This blocks the deserialized
>>>>> messages from being passed to downstream operators. Any idea how to
>>>>> make this work? Appreciated very much!
>>>>>
>>>>>
>>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>> java.time.Instant: 2020-09-15T07:00:00Z
>>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>>>> at
>>>>>
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>>>> at
>>>>>
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>>>> at
>>>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>
>>>>
>>>
>>> --
>>>
>>>
>>>
>>> Create your own email signature
>>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>



--


--


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Ashley32189
In reply to this post by Lian Jiang

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers -- TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens.
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

Log In or Sign Up; Link: https://xubster.com/free546.html

Amateur Young Girls
Link; 1: https://xubster.com/users/546/12421/0001
Link; 2: https://xubster.com/users/546/12462/0002
Link; 3: https://xubster.com/users/546/12463/0003
Link; 4: https://xubster.com/users/546/12464/0004
Link; 5: https://xubster.com/users/546/12465/0005
Link; 6: https://xubster.com/users/546/12466/0006
Link; 7: https://xubster.com/users/546/12467/0007
Link; 8: https://xubster.com/users/546/12468/0008
Link; 9: https://xubster.com/users/546/12469/0009
Link; 10: https://xubster.com/users/546/12470/0010
Link; 11: https://xubster.com/users/546/12471/0011
Link; 12: https://xubster.com/users/546/12472/0012

Covid: 2017 - 2030 - Girls and Boys
Link; 13: https://xubster.com/users/546/12422/0013
Link; 14: https://xubster.com/users/546/12473/0014

18 yo Teens Only
Innocent sweethearts star in 18 year old sex scenes
with their tight cunts filled by big cocks
Sexy coeds give hot blowjobs and swallow cum
Teens, Hardcore, Large Shlong, Amateur eighteen Years Old
18 year old masturbates on cam
Horny Teen 19 Years Old Masturbating On Web Cam
Only fingers filling my pussy to orgasm
Link; 15: https://xubster.com/users/546/12423/0015
Link; 16: https://xubster.com/users/546/12474/0016
Link; 17: https://xubster.com/users/546/12475/0017
Link; 18: https://xubster.com/users/546/12476/0018
Link; 19: https://xubster.com/users/546/12477/0019

Asian Tiktok Teens
Link; 20: https://xubster.com/users/546/12424/0020
Link; 21: https://xubster.com/users/546/12479/0021
Link; 22: https://xubster.com/users/546/12480/0022
Link; 23: https://xubster.com/users/546/12481/0023
Link; 24: https://xubster.com/users/546/12482/0024
Link; 25: https://xubster.com/users/546/12483/0025
Link; 26: https://xubster.com/users/546/12484/0026
Link; 27: https://xubster.com/users/546/12485/0027
Link; 28: https://xubster.com/users/546/12486/0028
Link; 29: https://xubster.com/users/546/12487/0029
Link; 30: https://xubster.com/users/546/12488/0030
Link; 31: https://xubster.com/users/546/12489/0031

Real Life Cam
Teen and Young Girls and Couples
Voyeur Villa Nelly Doggystyle HD Sex
Real Private Life on WebCam
Categories: Voyeur Sex, WebCam Porn, SpyCam Fetish
Teen Cam, Young Couples, Exhibitionism Video, Young Public Sex
WebCam Public Sex, Masturbation Teens, TeenSex
Link; 32: https://xubster.com/users/546/12418/0032
Link; 33: https://xubster.com/users/546/12490/0033
Link; 34: https://xubster.com/users/546/12491/0034
Link; 35: https://xubster.com/users/546/12492/0035
Link; 36: https://xubster.com/users/546/12493/0036
Link; 37: https://xubster.com/users/546/12494/0037

Porn Tiktok 18+ Banned on TikTok
Link; 38: https://xubster.com/users/546/12592/0038
Link; 39: https://xubster.com/users/546/12593/0039
Link; 40: https://xubster.com/users/546/12594/0040
Link; 41: https://xubster.com/users/546/12595/0041
Link; 42: https://xubster.com/users/546/12596/0042
Link; 43: https://xubster.com/users/546/12597/0043
Link; 44: https://xubster.com/users/546/12598/0044
Link; 45: https://xubster.com/users/546/12599/0045
Link; 46: https://xubster.com/users/546/12600/0046
Link; 47: https://xubster.com/users/546/12601/0047
Link; 48: https://xubster.com/users/546/12602/0048
Link; 49: https://xubster.com/users/546/12603/0049
Link; 50: https://xubster.com/users/546/12604/0050

Teen Models
Japanese Teen Girl in WebCam Show After School
MISS VIKKI - TEEN RUSSIAN MODEL
Little Miss Vikki From Russia -- My Private Collection
Mattie Doll -- Horny Teen With a Talent for Sharing Sensational Orgasmes
Kyutty Kitty -- Asian Sweety PussyCat
REAL VIDEOS OF SEXY TEEN MODEL Hentai-Cat
Effy Loweell -- Sexy Young Models With Small Tits

Alice MFC
Link; 51: https://xubster.com/users/546/12427

Alison Lil Baby
Link; 52: https://xubster.com/users/546/12428

Cute Mary
Link; 53: https://xubster.com/users/546/12429

Effy Loweell
Link; 54: https://xubster.com/users/546/12430

Hana Lily
Link; 55: https://xubster.com/users/546/12431

Hentai-Cat
Link; 56: https://xubster.com/users/546/12432

Hot Nesquik
Link; 57: https://xubster.com/users/546/12433

Katya Letova
Link; 58: https://xubster.com/users/546/12434

Koska Leska
Link; 59: https://xubster.com/users/546/12435

Kyutty
Link; 60: https://xubster.com/users/546/12436

Mattie Doll
Link; 61: https://xubster.com/users/546/12437

Miss Vikki
Link; 62: https://xubster.com/users/546/12438

Venus Kitty
Link; 63: https://xubster.com/users/546/12439

Your Wet Schoolgirl
Link; 64: https://xubster.com/users/546/12440

Non Nude Tiktok Teens
Teen Cute Girls talk, sexy dance and play on cam
Link; 65: https://xubster.com/users/546/12452/0065
Link; 66: https://xubster.com/users/546/12507/0066
Link; 67: https://xubster.com/users/546/12508/0067
Link; 68: https://xubster.com/users/546/12509/0068
Link; 69: https://xubster.com/users/546/12510/0069
Link; 70: https://xubster.com/users/546/12511/0070
Link; 71: https://xubster.com/users/546/12512/0071
Link; 72: https://xubster.com/users/546/12513/0072
Link; 73: https://xubster.com/users/546/12514/0073
Link; 74: https://xubster.com/users/546/12515/0074
Link; 75: https://xubster.com/users/546/12516/0075
Link; 76: https://xubster.com/users/546/12517/0076

Nudism Young Girls
Link; 77: https://xubster.com/users/546/12453/0077
Link; 78: https://xubster.com/users/546/12518/0078
Link; 79: https://xubster.com/users/546/12519/0079
Link; 80: https://xubster.com/users/546/12520/0080
Link; 81: https://xubster.com/users/546/12521/0081
Link; 82: https://xubster.com/users/546/12522/0082
Link; 83: https://xubster.com/users/546/12523/0083

Russian Family Incest
Incest family teens
Link; 84: https://xubster.com/users/546/12454/0084
Link; 85: https://xubster.com/users/546/12524/0085
Link; 86: https://xubster.com/users/546/12525/0086
Link; 87: https://xubster.com/users/546/12526/0087
Link; 88: https://xubster.com/users/546/12527/0088
Link; 89: https://xubster.com/users/546/12528/0089
Link; 90: https://xubster.com/users/546/12529/0090
Link; 91: https://xubster.com/users/546/12530/0091

Real Spycam - Hiddencam
Link; 92: https://xubster.com/users/546/12455/0092
Link; 93: https://xubster.com/users/546/12531/0093
Link; 94: https://xubster.com/users/546/12532/0094
Link; 95: https://xubster.com/users/546/12533/0095
Link; 96: https://xubster.com/users/546/12534/0096
Link; 97: https://xubster.com/users/546/12535/0097
Link; 98: https://xubster.com/users/546/12536/0098
Link; 99: https://xubster.com/users/546/12537/0099
Link; 100: https://xubster.com/users/546/12538/0100
Link; 101: https://xubster.com/users/546/12539/0101
Link; 102: https://xubster.com/users/546/12540/0102
Link; 103: https://xubster.com/users/546/12541/0103

Tight Teen Pussy
FUCK TIGHT TEEN PUSSY - Real Fuck Extreme Small Teen Pussy - 18+
FULL HD 83 Hot Home Made Videos of Real Extreme Fuck Small Teen Pussy
Link; 104: https://xubster.com/users/546/12456/0104

Random Tiktok Girls
Link; 105: https://xubster.com/users/546/12457/0105
Link; 106: https://xubster.com/users/546/12542/0106
Link; 107: https://xubster.com/users/546/12543/0107
Link; 108: https://xubster.com/users/546/12544/0108
Link; 109: https://xubster.com/users/546/12545/0109
Link; 110: https://xubster.com/users/546/12546/0110
Link; 111: https://xubster.com/users/546/12547/0111
Link; 112: https://xubster.com/users/546/12548/0112
Link; 113: https://xubster.com/users/546/12549/0113
Link; 114: https://xubster.com/users/546/12550/0114
Link; 115: https://xubster.com/users/546/12551/0115
Link; 116: https://xubster.com/users/546/12552/0116
Link; 117: https://xubster.com/users/546/12553/0117
Link; 118: https://xubster.com/users/546/12554/0118

Skype and Omegle Girls
Link; 119: https://xubster.com/users/546/12459/0119
Link; 120: https://xubster.com/users/546/12555/0120
Link; 121: https://xubster.com/users/546/12556/0121
Link; 122: https://xubster.com/users/546/12557/0122
Link; 123: https://xubster.com/users/546/12558/0123
Link; 124: https://xubster.com/users/546/12559/0124
Link; 125: https://xubster.com/users/546/12560/0125
Link; 126: https://xubster.com/users/546/12561/0126
Link; 127: https://xubster.com/users/546/12562/0127
Link; 128: https://xubster.com/users/546/12563/0128
Link; 129: https://xubster.com/users/546/12564/0129
Link; 130: https://xubster.com/users/546/12565/0130

Tiktok Nude Girls
Link; 131: https://xubster.com/users/546/12460/0131
Link; 132: https://xubster.com/users/546/12566/0132
Link; 133: https://xubster.com/users/546/12567/0133
Link; 134: https://xubster.com/users/546/12568/0134
Link; 135: https://xubster.com/users/546/12569/0135
Link; 136: https://xubster.com/users/546/12570/0136
Link; 137: https://xubster.com/users/546/12571/0137
Link; 138: https://xubster.com/users/546/12572/0138
Link; 139: https://xubster.com/users/546/12573/0139
Link; 140: https://xubster.com/users/546/12574/0140
Link; 141: https://xubster.com/users/546/12575/0141
Link; 142: https://xubster.com/users/546/12576/0142
Link; 143: https://xubster.com/users/546/12577/0143
Link; 144: https://xubster.com/users/546/12578/0144
Link; 145: https://xubster.com/users/546/12579/0145

Webcam Teens and Couples
Link; 146: https://xubster.com/users/546/12461/0146
Link; 147: https://xubster.com/users/546/12580/0147
Link; 148: https://xubster.com/users/546/12581/0148
Link; 149: https://xubster.com/users/546/12582/0149
Link; 150: https://xubster.com/users/546/12583/0150
Link; 151: https://xubster.com/users/546/12584/0151
Link; 152: https://xubster.com/users/546/12585/0152
Link; 153: https://xubster.com/users/546/12586/0153
Link; 154: https://xubster.com/users/546/12587/0154
Link; 155: https://xubster.com/users/546/12588/0155
Link; 156: https://xubster.com/users/546/12589/0156
Link; 157: https://xubster.com/users/546/12590/0157

Rape Porn; Free Porn Videos; HD - VR Sex Videos
Link; 158: https://xubster.com/users/546/6261/001

Snuff Porn Videos; Fake murders and decapitation
Link; 159: https://xubster.com/users/546/6265/002

Forced Porn; Forced Sex - Forced To Fuck Videos
Link; 160: https://xubster.com/users/546/6266/003

Hot Asian Teen Sex Videos; Japanese and Korean Porn Movies
Link; 161: https://xubster.com/users/546/6262/004

Sleeping Girl Gets An Unexpected Visit Late At Night
Sex Sleeping Girl Porn Videos
Link; 162: https://xubster.com/users/546/6296/005

Lesbian Necrophilia Porn Videos
Link; 163: https://xubster.com/users/546/6290/006

Horror Porn Videos - Sex Movies
Link; 164: https://xubster.com/users/546/6295/007

Sex Gay Porn Videos
Link; 165: https://xubster.com/users/546/6281/008

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

3D, Hentai, Free Games

Download from Daofile.com:
Link: https://daofile.com/go/p47ssveuv75c

Sex Cartoons; Free Games; Hentai; Manga; 3D

Download: MegaPack; 367249 Files:
Link: https://file.al/public/56284/38916

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://xubster.com/users/546/12061

HD Young Nudists PN Videos

Download from Xubster.com:
Link: https://xubster.com/users/546/12062
Link: https://xubster.com/users/546/12067

PN Teen Girls; Young Adult Nudism; Teen Nudist Sex:

Download: MegaPack; 83150 Files:
Link: https://file.al/public/56284/38915

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Private Collection; Real Teen Sexy Selfies and Videos

Download: MegaPack; 192851 Files:
Link: https://file.al/public/56284/38917

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://daofile.com/go/efb4i0wyu73a

TukTuk Patrol - Thai Teen Video:

Download from Daofile.com:
Link: https://daofile.com/go/0ackk6rwv5gq

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Girls and Boys PT - Photo and Video

Download: MegaPack; 531498 Files:
Link: https://file.al/public/56284/39650

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

VIP: - Young Nude Vagina
Link; 1: https://daofile.com/go/58017o3w2wa1

VIP: - Taboo Teen Archive
Link; 2: https://daofile.com/go/at6nq7tzdrwq
Link; 3: https://daofile.com/go/uqvdfvlt1b7j

VIP: - Private Sex Orgy; - Self Teen Girls
Link; 4: https://daofile.com/go/rwmcfthjrcew
Link; 5: https://daofile.com/go/7x4q0mtks6bo

Young Teen Bathing Suit Videos and HD Footage
Asian Woman In Bathroom Videos and HD Footage
Japanese Bath Culture, Public Bath
Link; 6: https://daofile.com/go/zvcjqfm0s50w
Link; 7: https://daofile.com/go/62mt4oaxq78n
Link; 8: https://daofile.com/go/x1q0iheql2ym
Link; 9: https://daofile.com/go/c9d7xs7fsme6
Link; 10: https://daofile.com/go/0pza6o1prc3r
Link; 11: https://daofile.com/go/s6qkgi10c7tf
Link; 12: https://daofile.com/go/1mg548npuj9p
Link; 13: https://daofile.com/go/piif3sxzu7y4

Japanese Teen HD Sex Porn Videos
Japanese School Girl Full Movie Porn videos
Japanese teen jav xxx sex school asian big tits milf mom sister porn HD
Link; 14: https://daofile.com/go/ws3qwum15koi
Link; 15: https://daofile.com/go/z9k8qssuw74c
Link; 16: https://daofile.com/go/r2pmntlq6vkp
Link; 17: https://daofile.com/go/r9khigu0c0xe
Link; 18: https://daofile.com/go/5qfkaafzvk0k
Link; 19: https://daofile.com/go/st4jcfg1g9bz
Link; 20: https://daofile.com/go/4hvigt8dchbc

Selfie teens
Real teens sexy selfies, show teen tits
Link; 21: https://daofile.com/go/dlfstx2s3mv3
Link; 22: https://daofile.com/go/3i181cjpm77j
Link; 23: https://daofile.com/go/ccyhj01bdnmg
Link; 24: https://daofile.com/go/gh7snep8cn54

Collection of Teen Sex and Erotic Videos
Link; 25: https://daofile.com/go/kun7aw1l0sxy
Link; 26: https://daofile.com/go/u9jikdewbmen
Link; 27: https://daofile.com/go/sxflclskqlde
Link; 28: https://daofile.com/go/htsmkg04kkop
Link; 29: https://daofile.com/go/daq9svwypcpg
Link; 30: https://daofile.com/go/d26g52rcnyql
Link; 31: https://daofile.com/go/4msqlcw96jyf
Link; 32: https://daofile.com/go/a6vapjguf0x7
Link; 33: https://daofile.com/go/937pupbznnt1

Webcam Teen
Teen Erotic Videos From Real Life Cams - Omegle teen, Skype teen
Link; 34: https://daofile.com/go/e2nnzbuhjt5z
Link; 35: https://daofile.com/go/nz1tewuygcr1
Link; 36: https://daofile.com/go/23sfxojnkhlc
Link; 37: https://daofile.com/go/31lpybl6312o
Link; 38: https://daofile.com/go/ahtftflfq6gl
Link; 39: https://daofile.com/go/5xcpj94xj6tw
Link; 40: https://daofile.com/go/gyshuzhg00l8
Link; 41: https://daofile.com/go/kr3zaonpkf4p
Link; 42: https://daofile.com/go/cl923bdxvs9k
Link; 43: https://daofile.com/go/3burlwssg7py
Link; 44: https://daofile.com/go/e12thco5doao

Young Girls and Boys Make Real Hot Sex on Cam
Link; 45: https://daofile.com/go/k0ws7lypjw5c
Link; 46: https://daofile.com/go/w96f0hj7ym8t

Sex Machine Porn Videos
Link; 47: https://daofile.com/go/psc0hbsfch2w
Link; 48: https://daofile.com/go/xet9s4b8l1n0

Candy-Dolls; Teen Crazy Girls Gallery
Link; 49: https://daofile.com/go/ka68my4wdqca
Link; 50: https://daofile.com/go/wwiaf2oaavgp

Toilet HD Videos - Hidden cams in toilets film every amateur comer
Watch Public toilet spy cam of girls pissing of Best Collection Voyeur Porn videos
Medical And Gyno Voyeur Videos
Watch Medical voyeur cam shooting
Asian explored in the gyno office of Best Collection Voyeur Porn videos
Link; 51: https://daofile.com/go/71feh10vjrfe
Link; 52: https://daofile.com/go/osxvp1epjyam
Link; 53: https://daofile.com/go/ylhuc48hu73l
Link; 54: https://daofile.com/go/5lx4gmnok82y
Link; 55: https://daofile.com/go/u13oih6vbrjc
Link; 56: https://daofile.com/go/s3a5qk1p5cyw

Tokyo-Dolls - Sexy Teen Girls - Full Collection
Link; 57: https://daofile.com/go/y9w47mnulyw1

Galitsin Teen - TP Sex Videos i7149
Link; 58: https://daofile.com/go/6uttrr3le10n
Link; 59: https://daofile.com/go/sjatsg7tjroi
Link; 60: https://daofile.com/go/1gx4d40cf40w
Link; 61: https://daofile.com/go/mu8hlmao4fge
Link; 62: https://daofile.com/go/phu0sv1tgx9c
Link; 63: https://daofile.com/go/z6oa86xp644b

Candid HD
Link; 64: https://daofile.com/go/1piz0c48n4p3

Femdom BDSM;
Link; 65: https://daofile.com/go/u6w29zcyys0c

Nonude Models
Link; 66: https://daofile.com/go/nnxix879th8k

Nudi-Pageant
Link; 67: https://daofile.com/go/nxnme8zrveru

TTL and YFM Teen Latinas Models
Link; 68: http://daofile.com/go/wlpk7947rax3

*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*
.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers - TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

___________________
___________________

Young Girls and Boys Make Real Hot Sex on Cam

Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Ashley32189
In reply to this post by Lian Jiang

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers -- TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens.
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

Log In or Sign Up; Link: https://xubster.com/free546.html

Amateur Young Girls
Link; 1: https://xubster.com/users/546/12421/0001
Link; 2: https://xubster.com/users/546/12462/0002
Link; 3: https://xubster.com/users/546/12463/0003
Link; 4: https://xubster.com/users/546/12464/0004
Link; 5: https://xubster.com/users/546/12465/0005
Link; 6: https://xubster.com/users/546/12466/0006
Link; 7: https://xubster.com/users/546/12467/0007
Link; 8: https://xubster.com/users/546/12468/0008
Link; 9: https://xubster.com/users/546/12469/0009
Link; 10: https://xubster.com/users/546/12470/0010
Link; 11: https://xubster.com/users/546/12471/0011
Link; 12: https://xubster.com/users/546/12472/0012

Covid: 2017 - 2030 - Girls and Boys
Link; 13: https://xubster.com/users/546/12422/0013
Link; 14: https://xubster.com/users/546/12473/0014

18 yo Teens Only
Innocent sweethearts star in 18 year old sex scenes
with their tight cunts filled by big cocks
Sexy coeds give hot blowjobs and swallow cum
Teens, Hardcore, Large Shlong, Amateur eighteen Years Old
18 year old masturbates on cam
Horny Teen 19 Years Old Masturbating On Web Cam
Only fingers filling my pussy to orgasm
Link; 15: https://xubster.com/users/546/12423/0015
Link; 16: https://xubster.com/users/546/12474/0016
Link; 17: https://xubster.com/users/546/12475/0017
Link; 18: https://xubster.com/users/546/12476/0018
Link; 19: https://xubster.com/users/546/12477/0019

Asian Tiktok Teens
Link; 20: https://xubster.com/users/546/12424/0020
Link; 21: https://xubster.com/users/546/12479/0021
Link; 22: https://xubster.com/users/546/12480/0022
Link; 23: https://xubster.com/users/546/12481/0023
Link; 24: https://xubster.com/users/546/12482/0024
Link; 25: https://xubster.com/users/546/12483/0025
Link; 26: https://xubster.com/users/546/12484/0026
Link; 27: https://xubster.com/users/546/12485/0027
Link; 28: https://xubster.com/users/546/12486/0028
Link; 29: https://xubster.com/users/546/12487/0029
Link; 30: https://xubster.com/users/546/12488/0030
Link; 31: https://xubster.com/users/546/12489/0031

Real Life Cam
Teen and Young Girls and Couples
Voyeur Villa Nelly Doggystyle HD Sex
Real Private Life on WebCam
Categories: Voyeur Sex, WebCam Porn, SpyCam Fetish
Teen Cam, Young Couples, Exhibitionism Video, Young Public Sex
WebCam Public Sex, Masturbation Teens, TeenSex
Link; 32: https://xubster.com/users/546/12418/0032
Link; 33: https://xubster.com/users/546/12490/0033
Link; 34: https://xubster.com/users/546/12491/0034
Link; 35: https://xubster.com/users/546/12492/0035
Link; 36: https://xubster.com/users/546/12493/0036
Link; 37: https://xubster.com/users/546/12494/0037

Porn Tiktok 18+ Banned on TikTok
Link; 38: https://xubster.com/users/546/12592/0038
Link; 39: https://xubster.com/users/546/12593/0039
Link; 40: https://xubster.com/users/546/12594/0040
Link; 41: https://xubster.com/users/546/12595/0041
Link; 42: https://xubster.com/users/546/12596/0042
Link; 43: https://xubster.com/users/546/12597/0043
Link; 44: https://xubster.com/users/546/12598/0044
Link; 45: https://xubster.com/users/546/12599/0045
Link; 46: https://xubster.com/users/546/12600/0046
Link; 47: https://xubster.com/users/546/12601/0047
Link; 48: https://xubster.com/users/546/12602/0048
Link; 49: https://xubster.com/users/546/12603/0049
Link; 50: https://xubster.com/users/546/12604/0050

Teen Models
Japanese Teen Girl in WebCam Show After School
MISS VIKKI - TEEN RUSSIAN MODEL
Little Miss Vikki From Russia -- My Private Collection
Mattie Doll -- Horny Teen With a Talent for Sharing Sensational Orgasmes
Kyutty Kitty -- Asian Sweety PussyCat
REAL VIDEOS OF SEXY TEEN MODEL Hentai-Cat
Effy Loweell -- Sexy Young Models With Small Tits

Alice MFC
Link; 51: https://xubster.com/users/546/12427

Alison Lil Baby
Link; 52: https://xubster.com/users/546/12428

Cute Mary
Link; 53: https://xubster.com/users/546/12429

Effy Loweell
Link; 54: https://xubster.com/users/546/12430

Hana Lily
Link; 55: https://xubster.com/users/546/12431

Hentai-Cat
Link; 56: https://xubster.com/users/546/12432

Hot Nesquik
Link; 57: https://xubster.com/users/546/12433

Katya Letova
Link; 58: https://xubster.com/users/546/12434

Koska Leska
Link; 59: https://xubster.com/users/546/12435

Kyutty
Link; 60: https://xubster.com/users/546/12436

Mattie Doll
Link; 61: https://xubster.com/users/546/12437

Miss Vikki
Link; 62: https://xubster.com/users/546/12438

Venus Kitty
Link; 63: https://xubster.com/users/546/12439

Your Wet Schoolgirl
Link; 64: https://xubster.com/users/546/12440

Non Nude Tiktok Teens
Teen Cute Girls talk, sexy dance and play on cam
Link; 65: https://xubster.com/users/546/12452/0065
Link; 66: https://xubster.com/users/546/12507/0066
Link; 67: https://xubster.com/users/546/12508/0067
Link; 68: https://xubster.com/users/546/12509/0068
Link; 69: https://xubster.com/users/546/12510/0069
Link; 70: https://xubster.com/users/546/12511/0070
Link; 71: https://xubster.com/users/546/12512/0071
Link; 72: https://xubster.com/users/546/12513/0072
Link; 73: https://xubster.com/users/546/12514/0073
Link; 74: https://xubster.com/users/546/12515/0074
Link; 75: https://xubster.com/users/546/12516/0075
Link; 76: https://xubster.com/users/546/12517/0076

Nudism Young Girls
Link; 77: https://xubster.com/users/546/12453/0077
Link; 78: https://xubster.com/users/546/12518/0078
Link; 79: https://xubster.com/users/546/12519/0079
Link; 80: https://xubster.com/users/546/12520/0080
Link; 81: https://xubster.com/users/546/12521/0081
Link; 82: https://xubster.com/users/546/12522/0082
Link; 83: https://xubster.com/users/546/12523/0083

Russian Family Incest
Incest family teens
Link; 84: https://xubster.com/users/546/12454/0084
Link; 85: https://xubster.com/users/546/12524/0085
Link; 86: https://xubster.com/users/546/12525/0086
Link; 87: https://xubster.com/users/546/12526/0087
Link; 88: https://xubster.com/users/546/12527/0088
Link; 89: https://xubster.com/users/546/12528/0089
Link; 90: https://xubster.com/users/546/12529/0090
Link; 91: https://xubster.com/users/546/12530/0091

Real Spycam - Hiddencam
Link; 92: https://xubster.com/users/546/12455/0092
Link; 93: https://xubster.com/users/546/12531/0093
Link; 94: https://xubster.com/users/546/12532/0094
Link; 95: https://xubster.com/users/546/12533/0095
Link; 96: https://xubster.com/users/546/12534/0096
Link; 97: https://xubster.com/users/546/12535/0097
Link; 98: https://xubster.com/users/546/12536/0098
Link; 99: https://xubster.com/users/546/12537/0099
Link; 100: https://xubster.com/users/546/12538/0100
Link; 101: https://xubster.com/users/546/12539/0101
Link; 102: https://xubster.com/users/546/12540/0102
Link; 103: https://xubster.com/users/546/12541/0103

Tight Teen Pussy
FUCK TIGHT TEEN PUSSY - Real Fuck Extreme Small Teen Pussy - 18+
FULL HD 83 Hot Home Made Videos of Real Extreme Fuck Small Teen Pussy
Link; 104: https://xubster.com/users/546/12456/0104

Random Tiktok Girls
Link; 105: https://xubster.com/users/546/12457/0105
Link; 106: https://xubster.com/users/546/12542/0106
Link; 107: https://xubster.com/users/546/12543/0107
Link; 108: https://xubster.com/users/546/12544/0108
Link; 109: https://xubster.com/users/546/12545/0109
Link; 110: https://xubster.com/users/546/12546/0110
Link; 111: https://xubster.com/users/546/12547/0111
Link; 112: https://xubster.com/users/546/12548/0112
Link; 113: https://xubster.com/users/546/12549/0113
Link; 114: https://xubster.com/users/546/12550/0114
Link; 115: https://xubster.com/users/546/12551/0115
Link; 116: https://xubster.com/users/546/12552/0116
Link; 117: https://xubster.com/users/546/12553/0117
Link; 118: https://xubster.com/users/546/12554/0118

Skype and Omegle Girls
Link; 119: https://xubster.com/users/546/12459/0119
Link; 120: https://xubster.com/users/546/12555/0120
Link; 121: https://xubster.com/users/546/12556/0121
Link; 122: https://xubster.com/users/546/12557/0122
Link; 123: https://xubster.com/users/546/12558/0123
Link; 124: https://xubster.com/users/546/12559/0124
Link; 125: https://xubster.com/users/546/12560/0125
Link; 126: https://xubster.com/users/546/12561/0126
Link; 127: https://xubster.com/users/546/12562/0127
Link; 128: https://xubster.com/users/546/12563/0128
Link; 129: https://xubster.com/users/546/12564/0129
Link; 130: https://xubster.com/users/546/12565/0130

Tiktok Nude Girls
Link; 131: https://xubster.com/users/546/12460/0131
Link; 132: https://xubster.com/users/546/12566/0132
Link; 133: https://xubster.com/users/546/12567/0133
Link; 134: https://xubster.com/users/546/12568/0134
Link; 135: https://xubster.com/users/546/12569/0135
Link; 136: https://xubster.com/users/546/12570/0136
Link; 137: https://xubster.com/users/546/12571/0137
Link; 138: https://xubster.com/users/546/12572/0138
Link; 139: https://xubster.com/users/546/12573/0139
Link; 140: https://xubster.com/users/546/12574/0140
Link; 141: https://xubster.com/users/546/12575/0141
Link; 142: https://xubster.com/users/546/12576/0142
Link; 143: https://xubster.com/users/546/12577/0143
Link; 144: https://xubster.com/users/546/12578/0144
Link; 145: https://xubster.com/users/546/12579/0145

Webcam Teens and Couples
Link; 146: https://xubster.com/users/546/12461/0146
Link; 147: https://xubster.com/users/546/12580/0147
Link; 148: https://xubster.com/users/546/12581/0148
Link; 149: https://xubster.com/users/546/12582/0149
Link; 150: https://xubster.com/users/546/12583/0150
Link; 151: https://xubster.com/users/546/12584/0151
Link; 152: https://xubster.com/users/546/12585/0152
Link; 153: https://xubster.com/users/546/12586/0153
Link; 154: https://xubster.com/users/546/12587/0154
Link; 155: https://xubster.com/users/546/12588/0155
Link; 156: https://xubster.com/users/546/12589/0156
Link; 157: https://xubster.com/users/546/12590/0157

Rape Porn; Free Porn Videos; HD - VR Sex Videos
Link; 158: https://xubster.com/users/546/6261/001

Snuff Porn Videos; Fake murders and decapitation
Link; 159: https://xubster.com/users/546/6265/002

Forced Porn; Forced Sex - Forced To Fuck Videos
Link; 160: https://xubster.com/users/546/6266/003

Hot Asian Teen Sex Videos; Japanese and Korean Porn Movies
Link; 161: https://xubster.com/users/546/6262/004

Sleeping Girl Gets An Unexpected Visit Late At Night
Sex Sleeping Girl Porn Videos
Link; 162: https://xubster.com/users/546/6296/005

Lesbian Necrophilia Porn Videos
Link; 163: https://xubster.com/users/546/6290/006

Horror Porn Videos - Sex Movies
Link; 164: https://xubster.com/users/546/6295/007

Sex Gay Porn Videos
Link; 165: https://xubster.com/users/546/6281/008

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

3D, Hentai, Free Games

Download from Daofile.com:
Link: https://daofile.com/go/p47ssveuv75c

Sex Cartoons; Free Games; Hentai; Manga; 3D

Download: MegaPack; 367249 Files:
Link: https://file.al/public/56284/38916

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://xubster.com/users/546/12061

HD Young Nudists PN Videos

Download from Xubster.com:
Link: https://xubster.com/users/546/12062
Link: https://xubster.com/users/546/12067

PN Teen Girls; Young Adult Nudism; Teen Nudist Sex:

Download: MegaPack; 83150 Files:
Link: https://file.al/public/56284/38915

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Private Collection; Real Teen Sexy Selfies and Videos

Download: MegaPack; 192851 Files:
Link: https://file.al/public/56284/38917

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://daofile.com/go/efb4i0wyu73a

TukTuk Patrol - Thai Teen Video:

Download from Daofile.com:
Link: https://daofile.com/go/0ackk6rwv5gq

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Girls and Boys PT - Photo and Video

Download: MegaPack; 531498 Files:
Link: https://file.al/public/56284/39650

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

VIP: - Young Nude Vagina
Link; 1: https://daofile.com/go/58017o3w2wa1

VIP: - Taboo Teen Archive
Link; 2: https://daofile.com/go/at6nq7tzdrwq
Link; 3: https://daofile.com/go/uqvdfvlt1b7j

VIP: - Private Sex Orgy; - Self Teen Girls
Link; 4: https://daofile.com/go/rwmcfthjrcew
Link; 5: https://daofile.com/go/7x4q0mtks6bo

Young Teen Bathing Suit Videos and HD Footage
Asian Woman In Bathroom Videos and HD Footage
Japanese Bath Culture, Public Bath
Link; 6: https://daofile.com/go/zvcjqfm0s50w
Link; 7: https://daofile.com/go/62mt4oaxq78n
Link; 8: https://daofile.com/go/x1q0iheql2ym
Link; 9: https://daofile.com/go/c9d7xs7fsme6
Link; 10: https://daofile.com/go/0pza6o1prc3r
Link; 11: https://daofile.com/go/s6qkgi10c7tf
Link; 12: https://daofile.com/go/1mg548npuj9p
Link; 13: https://daofile.com/go/piif3sxzu7y4

Japanese Teen HD Sex Porn Videos
Japanese School Girl Full Movie Porn videos
Japanese teen jav xxx sex school asian big tits milf mom sister porn HD
Link; 14: https://daofile.com/go/ws3qwum15koi
Link; 15: https://daofile.com/go/z9k8qssuw74c
Link; 16: https://daofile.com/go/r2pmntlq6vkp
Link; 17: https://daofile.com/go/r9khigu0c0xe
Link; 18: https://daofile.com/go/5qfkaafzvk0k
Link; 19: https://daofile.com/go/st4jcfg1g9bz
Link; 20: https://daofile.com/go/4hvigt8dchbc

Selfie teens
Real teens sexy selfies, show teen tits
Link; 21: https://daofile.com/go/dlfstx2s3mv3
Link; 22: https://daofile.com/go/3i181cjpm77j
Link; 23: https://daofile.com/go/ccyhj01bdnmg
Link; 24: https://daofile.com/go/gh7snep8cn54

Collection of Teen Sex and Erotic Videos
Link; 25: https://daofile.com/go/kun7aw1l0sxy
Link; 26: https://daofile.com/go/u9jikdewbmen
Link; 27: https://daofile.com/go/sxflclskqlde
Link; 28: https://daofile.com/go/htsmkg04kkop
Link; 29: https://daofile.com/go/daq9svwypcpg
Link; 30: https://daofile.com/go/d26g52rcnyql
Link; 31: https://daofile.com/go/4msqlcw96jyf
Link; 32: https://daofile.com/go/a6vapjguf0x7
Link; 33: https://daofile.com/go/937pupbznnt1

Webcam Teen
Teen Erotic Videos From Real Life Cams - Omegle teen, Skype teen
Link; 34: https://daofile.com/go/e2nnzbuhjt5z
Link; 35: https://daofile.com/go/nz1tewuygcr1
Link; 36: https://daofile.com/go/23sfxojnkhlc
Link; 37: https://daofile.com/go/31lpybl6312o
Link; 38: https://daofile.com/go/ahtftflfq6gl
Link; 39: https://daofile.com/go/5xcpj94xj6tw
Link; 40: https://daofile.com/go/gyshuzhg00l8
Link; 41: https://daofile.com/go/kr3zaonpkf4p
Link; 42: https://daofile.com/go/cl923bdxvs9k
Link; 43: https://daofile.com/go/3burlwssg7py
Link; 44: https://daofile.com/go/e12thco5doao

Young Girls and Boys Make Real Hot Sex on Cam
Link; 45: https://daofile.com/go/k0ws7lypjw5c
Link; 46: https://daofile.com/go/w96f0hj7ym8t

Sex Machine Porn Videos
Link; 47: https://daofile.com/go/psc0hbsfch2w
Link; 48: https://daofile.com/go/xet9s4b8l1n0

Candy-Dolls; Teen Crazy Girls Gallery
Link; 49: https://daofile.com/go/ka68my4wdqca
Link; 50: https://daofile.com/go/wwiaf2oaavgp

Toilet HD Videos - Hidden cams in toilets film every amateur comer
Watch Public toilet spy cam of girls pissing of Best Collection Voyeur Porn videos
Medical And Gyno Voyeur Videos
Watch Medical voyeur cam shooting
Asian explored in the gyno office of Best Collection Voyeur Porn videos
Link; 51: https://daofile.com/go/71feh10vjrfe
Link; 52: https://daofile.com/go/osxvp1epjyam
Link; 53: https://daofile.com/go/ylhuc48hu73l
Link; 54: https://daofile.com/go/5lx4gmnok82y
Link; 55: https://daofile.com/go/u13oih6vbrjc
Link; 56: https://daofile.com/go/s3a5qk1p5cyw

Tokyo-Dolls - Sexy Teen Girls - Full Collection
Link; 57: https://daofile.com/go/y9w47mnulyw1

Galitsin Teen - TP Sex Videos i7149
Link; 58: https://daofile.com/go/6uttrr3le10n
Link; 59: https://daofile.com/go/sjatsg7tjroi
Link; 60: https://daofile.com/go/1gx4d40cf40w
Link; 61: https://daofile.com/go/mu8hlmao4fge
Link; 62: https://daofile.com/go/phu0sv1tgx9c
Link; 63: https://daofile.com/go/z6oa86xp644b

Candid HD
Link; 64: https://daofile.com/go/1piz0c48n4p3

Femdom BDSM;
Link; 65: https://daofile.com/go/u6w29zcyys0c

Nonude Models
Link; 66: https://daofile.com/go/nnxix879th8k

Nudi-Pageant
Link; 67: https://daofile.com/go/nxnme8zrveru

TTL and YFM Teen Latinas Models
Link; 68: http://daofile.com/go/wlpk7947rax3

*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*
.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers - TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

___________________
___________________

Reply | Threaded
Open this post in threaded view
|

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

Ashley32189
In reply to this post by Lian Jiang

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers -- TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens.
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_

Log In or Sign Up; Link: https://xubster.com/free546.html

Amateur Young Girls
Link; 1: https://xubster.com/users/546/12421/0001
Link; 2: https://xubster.com/users/546/12462/0002
Link; 3: https://xubster.com/users/546/12463/0003
Link; 4: https://xubster.com/users/546/12464/0004
Link; 5: https://xubster.com/users/546/12465/0005
Link; 6: https://xubster.com/users/546/12466/0006
Link; 7: https://xubster.com/users/546/12467/0007
Link; 8: https://xubster.com/users/546/12468/0008
Link; 9: https://xubster.com/users/546/12469/0009
Link; 10: https://xubster.com/users/546/12470/0010
Link; 11: https://xubster.com/users/546/12471/0011
Link; 12: https://xubster.com/users/546/12472/0012

Covid: 2017 - 2030 - Girls and Boys
Link; 13: https://xubster.com/users/546/12422/0013
Link; 14: https://xubster.com/users/546/12473/0014

18 yo Teens Only
Innocent sweethearts star in 18 year old sex scenes
with their tight cunts filled by big cocks
Sexy coeds give hot blowjobs and swallow cum
Teens, Hardcore, Large Shlong, Amateur eighteen Years Old
18 year old masturbates on cam
Horny Teen 19 Years Old Masturbating On Web Cam
Only fingers filling my pussy to orgasm
Link; 15: https://xubster.com/users/546/12423/0015
Link; 16: https://xubster.com/users/546/12474/0016
Link; 17: https://xubster.com/users/546/12475/0017
Link; 18: https://xubster.com/users/546/12476/0018
Link; 19: https://xubster.com/users/546/12477/0019

Asian Tiktok Teens
Link; 20: https://xubster.com/users/546/12424/0020
Link; 21: https://xubster.com/users/546/12479/0021
Link; 22: https://xubster.com/users/546/12480/0022
Link; 23: https://xubster.com/users/546/12481/0023
Link; 24: https://xubster.com/users/546/12482/0024
Link; 25: https://xubster.com/users/546/12483/0025
Link; 26: https://xubster.com/users/546/12484/0026
Link; 27: https://xubster.com/users/546/12485/0027
Link; 28: https://xubster.com/users/546/12486/0028
Link; 29: https://xubster.com/users/546/12487/0029
Link; 30: https://xubster.com/users/546/12488/0030
Link; 31: https://xubster.com/users/546/12489/0031

Real Life Cam
Teen and Young Girls and Couples
Voyeur Villa Nelly Doggystyle HD Sex
Real Private Life on WebCam
Categories: Voyeur Sex, WebCam Porn, SpyCam Fetish
Teen Cam, Young Couples, Exhibitionism Video, Young Public Sex
WebCam Public Sex, Masturbation Teens, TeenSex
Link; 32: https://xubster.com/users/546/12418/0032
Link; 33: https://xubster.com/users/546/12490/0033
Link; 34: https://xubster.com/users/546/12491/0034
Link; 35: https://xubster.com/users/546/12492/0035
Link; 36: https://xubster.com/users/546/12493/0036
Link; 37: https://xubster.com/users/546/12494/0037

Porn Tiktok 18+ Banned on TikTok
Link; 38: https://xubster.com/users/546/12592/0038
Link; 39: https://xubster.com/users/546/12593/0039
Link; 40: https://xubster.com/users/546/12594/0040
Link; 41: https://xubster.com/users/546/12595/0041
Link; 42: https://xubster.com/users/546/12596/0042
Link; 43: https://xubster.com/users/546/12597/0043
Link; 44: https://xubster.com/users/546/12598/0044
Link; 45: https://xubster.com/users/546/12599/0045
Link; 46: https://xubster.com/users/546/12600/0046
Link; 47: https://xubster.com/users/546/12601/0047
Link; 48: https://xubster.com/users/546/12602/0048
Link; 49: https://xubster.com/users/546/12603/0049
Link; 50: https://xubster.com/users/546/12604/0050

Teen Models
Japanese Teen Girl in WebCam Show After School
MISS VIKKI - TEEN RUSSIAN MODEL
Little Miss Vikki From Russia -- My Private Collection
Mattie Doll -- Horny Teen With a Talent for Sharing Sensational Orgasmes
Kyutty Kitty -- Asian Sweety PussyCat
REAL VIDEOS OF SEXY TEEN MODEL Hentai-Cat
Effy Loweell -- Sexy Young Models With Small Tits

Alice MFC
Link; 51: https://xubster.com/users/546/12427

Alison Lil Baby
Link; 52: https://xubster.com/users/546/12428

Cute Mary
Link; 53: https://xubster.com/users/546/12429

Effy Loweell
Link; 54: https://xubster.com/users/546/12430

Hana Lily
Link; 55: https://xubster.com/users/546/12431

Hentai-Cat
Link; 56: https://xubster.com/users/546/12432

Hot Nesquik
Link; 57: https://xubster.com/users/546/12433

Katya Letova
Link; 58: https://xubster.com/users/546/12434

Koska Leska
Link; 59: https://xubster.com/users/546/12435

Kyutty
Link; 60: https://xubster.com/users/546/12436

Mattie Doll
Link; 61: https://xubster.com/users/546/12437

Miss Vikki
Link; 62: https://xubster.com/users/546/12438

Venus Kitty
Link; 63: https://xubster.com/users/546/12439

Your Wet Schoolgirl
Link; 64: https://xubster.com/users/546/12440

Non Nude Tiktok Teens
Teen Cute Girls talk, sexy dance and play on cam
Link; 65: https://xubster.com/users/546/12452/0065
Link; 66: https://xubster.com/users/546/12507/0066
Link; 67: https://xubster.com/users/546/12508/0067
Link; 68: https://xubster.com/users/546/12509/0068
Link; 69: https://xubster.com/users/546/12510/0069
Link; 70: https://xubster.com/users/546/12511/0070
Link; 71: https://xubster.com/users/546/12512/0071
Link; 72: https://xubster.com/users/546/12513/0072
Link; 73: https://xubster.com/users/546/12514/0073
Link; 74: https://xubster.com/users/546/12515/0074
Link; 75: https://xubster.com/users/546/12516/0075
Link; 76: https://xubster.com/users/546/12517/0076

Nudism Young Girls
Link; 77: https://xubster.com/users/546/12453/0077
Link; 78: https://xubster.com/users/546/12518/0078
Link; 79: https://xubster.com/users/546/12519/0079
Link; 80: https://xubster.com/users/546/12520/0080
Link; 81: https://xubster.com/users/546/12521/0081
Link; 82: https://xubster.com/users/546/12522/0082
Link; 83: https://xubster.com/users/546/12523/0083

Russian Family Incest
Incest family teens
Link; 84: https://xubster.com/users/546/12454/0084
Link; 85: https://xubster.com/users/546/12524/0085
Link; 86: https://xubster.com/users/546/12525/0086
Link; 87: https://xubster.com/users/546/12526/0087
Link; 88: https://xubster.com/users/546/12527/0088
Link; 89: https://xubster.com/users/546/12528/0089
Link; 90: https://xubster.com/users/546/12529/0090
Link; 91: https://xubster.com/users/546/12530/0091

Real Spycam - Hiddencam
Link; 92: https://xubster.com/users/546/12455/0092
Link; 93: https://xubster.com/users/546/12531/0093
Link; 94: https://xubster.com/users/546/12532/0094
Link; 95: https://xubster.com/users/546/12533/0095
Link; 96: https://xubster.com/users/546/12534/0096
Link; 97: https://xubster.com/users/546/12535/0097
Link; 98: https://xubster.com/users/546/12536/0098
Link; 99: https://xubster.com/users/546/12537/0099
Link; 100: https://xubster.com/users/546/12538/0100
Link; 101: https://xubster.com/users/546/12539/0101
Link; 102: https://xubster.com/users/546/12540/0102
Link; 103: https://xubster.com/users/546/12541/0103

Tight Teen Pussy
FUCK TIGHT TEEN PUSSY - Real Fuck Extreme Small Teen Pussy - 18+
FULL HD 83 Hot Home Made Videos of Real Extreme Fuck Small Teen Pussy
Link; 104: https://xubster.com/users/546/12456/0104

Random Tiktok Girls
Link; 105: https://xubster.com/users/546/12457/0105
Link; 106: https://xubster.com/users/546/12542/0106
Link; 107: https://xubster.com/users/546/12543/0107
Link; 108: https://xubster.com/users/546/12544/0108
Link; 109: https://xubster.com/users/546/12545/0109
Link; 110: https://xubster.com/users/546/12546/0110
Link; 111: https://xubster.com/users/546/12547/0111
Link; 112: https://xubster.com/users/546/12548/0112
Link; 113: https://xubster.com/users/546/12549/0113
Link; 114: https://xubster.com/users/546/12550/0114
Link; 115: https://xubster.com/users/546/12551/0115
Link; 116: https://xubster.com/users/546/12552/0116
Link; 117: https://xubster.com/users/546/12553/0117
Link; 118: https://xubster.com/users/546/12554/0118

Skype and Omegle Girls
Link; 119: https://xubster.com/users/546/12459/0119
Link; 120: https://xubster.com/users/546/12555/0120
Link; 121: https://xubster.com/users/546/12556/0121
Link; 122: https://xubster.com/users/546/12557/0122
Link; 123: https://xubster.com/users/546/12558/0123
Link; 124: https://xubster.com/users/546/12559/0124
Link; 125: https://xubster.com/users/546/12560/0125
Link; 126: https://xubster.com/users/546/12561/0126
Link; 127: https://xubster.com/users/546/12562/0127
Link; 128: https://xubster.com/users/546/12563/0128
Link; 129: https://xubster.com/users/546/12564/0129
Link; 130: https://xubster.com/users/546/12565/0130

Tiktok Nude Girls
Link; 131: https://xubster.com/users/546/12460/0131
Link; 132: https://xubster.com/users/546/12566/0132
Link; 133: https://xubster.com/users/546/12567/0133
Link; 134: https://xubster.com/users/546/12568/0134
Link; 135: https://xubster.com/users/546/12569/0135
Link; 136: https://xubster.com/users/546/12570/0136
Link; 137: https://xubster.com/users/546/12571/0137
Link; 138: https://xubster.com/users/546/12572/0138
Link; 139: https://xubster.com/users/546/12573/0139
Link; 140: https://xubster.com/users/546/12574/0140
Link; 141: https://xubster.com/users/546/12575/0141
Link; 142: https://xubster.com/users/546/12576/0142
Link; 143: https://xubster.com/users/546/12577/0143
Link; 144: https://xubster.com/users/546/12578/0144
Link; 145: https://xubster.com/users/546/12579/0145

Webcam Teens and Couples
Link; 146: https://xubster.com/users/546/12461/0146
Link; 147: https://xubster.com/users/546/12580/0147
Link; 148: https://xubster.com/users/546/12581/0148
Link; 149: https://xubster.com/users/546/12582/0149
Link; 150: https://xubster.com/users/546/12583/0150
Link; 151: https://xubster.com/users/546/12584/0151
Link; 152: https://xubster.com/users/546/12585/0152
Link; 153: https://xubster.com/users/546/12586/0153
Link; 154: https://xubster.com/users/546/12587/0154
Link; 155: https://xubster.com/users/546/12588/0155
Link; 156: https://xubster.com/users/546/12589/0156
Link; 157: https://xubster.com/users/546/12590/0157

Rape Porn; Free Porn Videos; HD - VR Sex Videos
Link; 158: https://xubster.com/users/546/6261/001

Snuff Porn Videos; Fake murders and decapitation
Link; 159: https://xubster.com/users/546/6265/002

Forced Porn; Forced Sex - Forced To Fuck Videos
Link; 160: https://xubster.com/users/546/6266/003

Hot Asian Teen Sex Videos; Japanese and Korean Porn Movies
Link; 161: https://xubster.com/users/546/6262/004

Sleeping Girl Gets An Unexpected Visit Late At Night
Sex Sleeping Girl Porn Videos
Link; 162: https://xubster.com/users/546/6296/005

Lesbian Necrophilia Porn Videos
Link; 163: https://xubster.com/users/546/6290/006

Horror Porn Videos - Sex Movies
Link; 164: https://xubster.com/users/546/6295/007

Sex Gay Porn Videos
Link; 165: https://xubster.com/users/546/6281/008

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

3D, Hentai, Free Games

Download from Daofile.com:
Link: https://daofile.com/go/p47ssveuv75c

Sex Cartoons; Free Games; Hentai; Manga; 3D

Download: MegaPack; 367249 Files:
Link: https://file.al/public/56284/38916

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://xubster.com/users/546/12061

HD Young Nudists PN Videos

Download from Xubster.com:
Link: https://xubster.com/users/546/12062
Link: https://xubster.com/users/546/12067

PN Teen Girls; Young Adult Nudism; Teen Nudist Sex:

Download: MegaPack; 83150 Files:
Link: https://file.al/public/56284/38915

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Private Collection; Real Teen Sexy Selfies and Videos

Download: MegaPack; 192851 Files:
Link: https://file.al/public/56284/38917

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Preview Video
Link: https://daofile.com/go/efb4i0wyu73a

TukTuk Patrol - Thai Teen Video:

Download from Daofile.com:
Link: https://daofile.com/go/0ackk6rwv5gq

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

Girls and Boys PT - Photo and Video

Download: MegaPack; 531498 Files:
Link: https://file.al/public/56284/39650

:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:

VIP: - Young Nude Vagina
Link; 1: https://daofile.com/go/58017o3w2wa1

VIP: - Taboo Teen Archive
Link; 2: https://daofile.com/go/at6nq7tzdrwq
Link; 3: https://daofile.com/go/uqvdfvlt1b7j

VIP: - Private Sex Orgy; - Self Teen Girls
Link; 4: https://daofile.com/go/rwmcfthjrcew
Link; 5: https://daofile.com/go/7x4q0mtks6bo

Young Teen Bathing Suit Videos and HD Footage
Asian Woman In Bathroom Videos and HD Footage
Japanese Bath Culture, Public Bath
Link; 6: https://daofile.com/go/zvcjqfm0s50w
Link; 7: https://daofile.com/go/62mt4oaxq78n
Link; 8: https://daofile.com/go/x1q0iheql2ym
Link; 9: https://daofile.com/go/c9d7xs7fsme6
Link; 10: https://daofile.com/go/0pza6o1prc3r
Link; 11: https://daofile.com/go/s6qkgi10c7tf
Link; 12: https://daofile.com/go/1mg548npuj9p
Link; 13: https://daofile.com/go/piif3sxzu7y4

Japanese Teen HD Sex Porn Videos
Japanese School Girl Full Movie Porn videos
Japanese teen jav xxx sex school asian big tits milf mom sister porn HD
Link; 14: https://daofile.com/go/ws3qwum15koi
Link; 15: https://daofile.com/go/z9k8qssuw74c
Link; 16: https://daofile.com/go/r2pmntlq6vkp
Link; 17: https://daofile.com/go/r9khigu0c0xe
Link; 18: https://daofile.com/go/5qfkaafzvk0k
Link; 19: https://daofile.com/go/st4jcfg1g9bz
Link; 20: https://daofile.com/go/4hvigt8dchbc

Selfie teens
Real teens sexy selfies, show teen tits
Link; 21: https://daofile.com/go/dlfstx2s3mv3
Link; 22: https://daofile.com/go/3i181cjpm77j
Link; 23: https://daofile.com/go/ccyhj01bdnmg
Link; 24: https://daofile.com/go/gh7snep8cn54

Collection of Teen Sex and Erotic Videos
Link; 25: https://daofile.com/go/kun7aw1l0sxy
Link; 26: https://daofile.com/go/u9jikdewbmen
Link; 27: https://daofile.com/go/sxflclskqlde
Link; 28: https://daofile.com/go/htsmkg04kkop
Link; 29: https://daofile.com/go/daq9svwypcpg
Link; 30: https://daofile.com/go/d26g52rcnyql
Link; 31: https://daofile.com/go/4msqlcw96jyf
Link; 32: https://daofile.com/go/a6vapjguf0x7
Link; 33: https://daofile.com/go/937pupbznnt1

Webcam Teen
Teen Erotic Videos From Real Life Cams - Omegle teen, Skype teen
Link; 34: https://daofile.com/go/e2nnzbuhjt5z
Link; 35: https://daofile.com/go/nz1tewuygcr1
Link; 36: https://daofile.com/go/23sfxojnkhlc
Link; 37: https://daofile.com/go/31lpybl6312o
Link; 38: https://daofile.com/go/ahtftflfq6gl
Link; 39: https://daofile.com/go/5xcpj94xj6tw
Link; 40: https://daofile.com/go/gyshuzhg00l8
Link; 41: https://daofile.com/go/kr3zaonpkf4p
Link; 42: https://daofile.com/go/cl923bdxvs9k
Link; 43: https://daofile.com/go/3burlwssg7py
Link; 44: https://daofile.com/go/e12thco5doao

Young Girls and Boys Make Real Hot Sex on Cam
Link; 45: https://daofile.com/go/k0ws7lypjw5c
Link; 46: https://daofile.com/go/w96f0hj7ym8t

Sex Machine Porn Videos
Link; 47: https://daofile.com/go/psc0hbsfch2w
Link; 48: https://daofile.com/go/xet9s4b8l1n0

Candy-Dolls; Teen Crazy Girls Gallery
Link; 49: https://daofile.com/go/ka68my4wdqca
Link; 50: https://daofile.com/go/wwiaf2oaavgp

Toilet HD Videos - Hidden cams in toilets film every amateur comer
Watch Public toilet spy cam of girls pissing of Best Collection Voyeur Porn videos
Medical And Gyno Voyeur Videos
Watch Medical voyeur cam shooting
Asian explored in the gyno office of Best Collection Voyeur Porn videos
Link; 51: https://daofile.com/go/71feh10vjrfe
Link; 52: https://daofile.com/go/osxvp1epjyam
Link; 53: https://daofile.com/go/ylhuc48hu73l
Link; 54: https://daofile.com/go/5lx4gmnok82y
Link; 55: https://daofile.com/go/u13oih6vbrjc
Link; 56: https://daofile.com/go/s3a5qk1p5cyw

Tokyo-Dolls - Sexy Teen Girls - Full Collection
Link; 57: https://daofile.com/go/y9w47mnulyw1

Galitsin Teen - TP Sex Videos i7149
Link; 58: https://daofile.com/go/6uttrr3le10n
Link; 59: https://daofile.com/go/sjatsg7tjroi
Link; 60: https://daofile.com/go/1gx4d40cf40w
Link; 61: https://daofile.com/go/mu8hlmao4fge
Link; 62: https://daofile.com/go/phu0sv1tgx9c
Link; 63: https://daofile.com/go/z6oa86xp644b

Candid HD
Link; 64: https://daofile.com/go/1piz0c48n4p3

Femdom BDSM;
Link; 65: https://daofile.com/go/u6w29zcyys0c

Nonude Models
Link; 66: https://daofile.com/go/nnxix879th8k

Nudi-Pageant
Link; 67: https://daofile.com/go/nxnme8zrveru

TTL and YFM Teen Latinas Models
Link; 68: http://daofile.com/go/wlpk7947rax3

*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*
.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*

Covid; 2017 - 2030 - Girls and Boys - Russian Family Incest - Private Video Collection
Young Girls and Boys Make Real Hot Sex on Cam; Private Video Collection:
ajb - Random Tiktok Girls -- Skype and Omegle Girls -- Tiktok Nude Girls
New Videos - New Movies - Movie Reviews - Movie Trailers - TV
Russian Family Incest - Incest family teens
New Free Games - Private Sex Orgy - Self Teen Girls; ajb - Archive
Private Video Collection - Very Explicit Cams
18 yo Teens Only - Asian Tiktok Teens
Home Made Model TKM -- Real Life Cam

Download from Xubster-com:
Link: https://xubster.com/users/546/9802

Download from Daofile-com:
Link: https://daofile.com/go/3w4soyhvuake

Download from Nelion-me:
Link: https://nelion.me/go/w98s182gorax

Download from File-al:
Link: https://file.al/public/56284/31885

___________________