I use a FlinkKinesisConsumer in a Flink job to de-serialize
kinesis events.
public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
implements KinesisSerializationSchema<T>, KinesisDeserializationSchema<T> {
private static final String REGISTRY_ENDPOINT = "https://schema-registry.my.net";
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private String topic;
public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {
this.tClass = tClass;
this.topic = null;
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
}
@Override
public ByteBuffer serialize(T obj) {
Properties props = new Properties();
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);
// some code to create schemaReg
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaReg, new HashMap(props));
return ByteBuffer.wrap(
serializer.serialize(topic, obj));
}
@Override
public T deserialize(
byte[] record,
String partitionKey,
String sequenceNumber,
long eventUtcTimestamp,
String streamName,
String shardId) throws IOException {
Properties props = new Properties();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, Boolean.toString(true));
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, REGISTRY_ENDPOINT);
VerifiableProperties vProps = new VerifiableProperties(props);
// some code to create schemaReg
final KafkaAvroDecoder decoder = new KafkaAvroDecoder(schemaReg, vProps);
return (T) decoder.fromBytes(record);
}
@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(tClass);
}
} // end of class ManagedSchemaKinesisPayloadSerDe
// create consumer, stream environment:
ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);
final FlinkKinesisConsumer<MyPoJoRecord> consumer = new FlinkKinesisConsumer<>(
streamName,
serDe,
streamConfig);
streamEnv
.addSource(consumer)
.print();
streamEnv.execute();
The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 19 more
Observation:
1. The kinesis events are deserialized SUCCESSFULLY. The exception comes when FlinkKinesisConsumer
tried to serialize the deserialized object post deserialization. FlinkKinesisConsumer will create its
own DatumWriter by using MyPoJoRecord (a child class of SpecificRecord). I have no control
on this DatumWriter. For example, I cannot add a logicDataType or register a custom kyro serializer.
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()) does not resolve the problem.
2. the joda mentioned in the exception is from "union {null, timestamp_ms }". This union will be
handled by org.apache.avro.generic.GenericData.resolveUnion(). Because of 1, resolveUnion()
cannot get any conversions and fail to handle Joda time.
Question:
Is it expected that FlinkKinesisConsumer cannot handle Joda time? Any solution here?
Appreciate very much!