FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

Lian Jiang
Hi,

I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events.

Flink: 1.9.2
Avro: 1.9.2

The serDe class is like:
public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
implements KinesisSerializationSchema<T>, KinesisDeserializationSchema<T> {

private static final String REGISTRY_ENDPOINT = "https://schema-registry.my.net";
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private String topic;

public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {
this.tClass = tClass;
this.topic = null;
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
}

@Override
public ByteBuffer serialize(T obj) {
Properties props = new Properties();
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);

    // some code to create schemaReg
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaReg, new HashMap(props));
return ByteBuffer.wrap(
serializer.serialize(topic, obj));
}

@Override
public T deserialize(
byte[] record,
String partitionKey,
String sequenceNumber,
long eventUtcTimestamp,
String streamName,
String shardId) throws IOException {
Properties props = new Properties();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.toString(true));
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);
VerifiableProperties vProps = new VerifiableProperties(props);

    // some code to create schemaReg
    final KafkaAvroDecoder decoder = new KafkaAvroDecoder(schemaReg, vProps);
return (T) decoder.fromBytes(record);
}

@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(tClass);
}
} // end of class ManagedSchemaKinesisPayloadSerDe


// create consumer, stream environment:
ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);

final FlinkKinesisConsumer<MyPoJoRecord> consumer = new FlinkKinesisConsumer<>(
streamName,
serDe,
streamConfig);

streamEnv
.addSource(consumer)
.print();
streamEnv.execute();


The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 19 more



Observation:
1. The kinesis events are deserialized SUCCESSFULLY. The exception comes when FlinkKinesisConsumer
tried to serialize the deserialized object post deserialization. FlinkKinesisConsumer will create its
own DatumWriter by using MyPoJoRecord (a child class of SpecificRecord). I have no control
on this DatumWriter. For example, I cannot add a logicDataType or register a custom kyro serializer. 
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()) does not resolve the problem.
 
2. the joda mentioned in the exception is from "union {null, timestamp_ms }". This union will be
handled by org.apache.avro.generic.GenericData.resolveUnion(). Because of 1, resolveUnion()
cannot get any conversions and fail to handle Joda time.

Question:
Is it expected that FlinkKinesisConsumer cannot handle Joda time? Any solution here?

Appreciate very much!




Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

Chesnay Schepler
From the stack trace it appears that flink-avro is used, which uses avro 1.8.2 internally by default,
for which this appears to be a known issue that was fixed in 1.9.?.

Are you sure that avro 1.9.2 is actually being used?

On 19/02/2020 23:53, Lian Jiang wrote:
Hi,

I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events.

Flink: 1.9.2
Avro: 1.9.2

The serDe class is like:
public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
        implements KinesisSerializationSchema<T>, KinesisDeserializationSchema<T> {

private static final String REGISTRY_ENDPOINT = "https://schema-registry.my.net";
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private String topic;

public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {
    this.tClass = tClass;
    this.topic = null;
    SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
}

@Override
public ByteBuffer serialize(T obj) {
    Properties props = new Properties();
    props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);

    // some code to create schemaReg
    final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaReg, new HashMap(props));
    return ByteBuffer.wrap(
            serializer.serialize(topic, obj));
}

@Override
public T deserialize(
        byte[] record,
        String partitionKey,
        String sequenceNumber,
        long eventUtcTimestamp,
        String streamName,
        String shardId) throws IOException {
    Properties props = new Properties();
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.toString(true));
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);
    VerifiableProperties vProps = new VerifiableProperties(props);

    // some code to create schemaReg
    final KafkaAvroDecoder decoder = new KafkaAvroDecoder(schemaReg, vProps);
    return  (T) decoder.fromBytes(record);
}

@Override
public TypeInformation<T> getProducedType() {
    return TypeInformation.of(tClass);
}
} // end of class ManagedSchemaKinesisPayloadSerDe


// create consumer, stream environment:
ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
        new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);

final FlinkKinesisConsumer<MyPoJoRecord> consumer = new FlinkKinesisConsumer<>(
        streamName,
        serDe,
        streamConfig);

streamEnv
       .addSource(consumer)
       .print();
streamEnv.execute();


The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
	at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
	at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 19 more



Observation:
1. The kinesis events are deserialized SUCCESSFULLY. The exception comes when FlinkKinesisConsumer
tried to serialize the deserialized object post deserialization. FlinkKinesisConsumer will create its
own DatumWriter by using MyPoJoRecord (a child class of SpecificRecord). I have no control
on this DatumWriter. For example, I cannot add a logicDataType or register a custom kyro serializer. 
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()) does not resolve the problem.
 
2. the joda mentioned in the exception is from "union {null, timestamp_ms }". This union will be
handled by org.apache.avro.generic.GenericData.resolveUnion(). Because of 1, resolveUnion()
cannot get any conversions and fail to handle Joda time.

Question:
Is it expected that FlinkKinesisConsumer cannot handle Joda time? Any solution here?

Appreciate very much!

        




Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

Lian Jiang
In reply to this post by Lian Jiang
Thanks. I see from mvnrepository that even Flink-Avro 1.10 (the latest) uses avro 1.8.2. Does this mean I have to use GenericData instead of avro POJOs if I use FlinkKinesisConsumer?

Sent from my iPhone

> On Feb 20, 2020, at 4:34 AM, Chesnay Schepler <[hidden email]> wrote:
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

Chesnay Schepler
What you could try is bumping the avro dependency to 1.9.2 . I don't whether this will just work, but it's worth a shot.

On 20/02/2020 16:25, Lian Jiang wrote:
Thanks. I see from mvnrepository that even Flink-Avro 1.10 (the latest) uses avro 1.8.2. Does this mean I have to use GenericData instead of avro POJOs if I use FlinkKinesisConsumer?

Sent from my iPhone

On Feb 20, 2020, at 4:34 AM, Chesnay Schepler [hidden email] wrote: