Re: Using logicalType in the Avro table format

Posted by Gyula Fóra on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Using-logicalType-in-the-Avro-table-format-tp34803p34843.html

Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.

@Dawid: 
Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
]
}'
) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula