Re: Using logicalType in the Avro table format

Posted by Dawid Wysakowicz-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Using-logicalType-in-the-Avro-table-format-tp34803p34841.html

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula



signature.asc (849 bytes) Download Attachment