Using logicalType in the Avro table format

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Using logicalType in the Avro table format

Gyula Fóra
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}

When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula


Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Arvid Heise-3
Hi Gyula,

can you please check which Avro version you are using?

Avro only supports Java 8 time (java.time.LocalDateTime) after 1.9.2. Before that everything was hardcoded to joda time.

However, I'm not entirely sure where the Java 8 time is coming in your example, as I'm not familiar with the type system of Table API.

Best,

Arvid

On Wed, Apr 29, 2020 at 3:48 PM Gyula Fóra <[hidden email]> wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}

When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Dawid Wysakowicz-2
In reply to this post by Gyula Fóra

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula



signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Gyula Fóra
Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.

@Dawid: 
Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
]
}'
) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula


Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Arvid Heise-3
Hi Gyula,

it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and see if this solves your particular problem.
The code path in GenericDatumWriter is taking the conversion path, so it might just work. Of course that depends on the schema being correctly translated to a specific record that uses the new TimeConversions [1].


On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <[hidden email]> wrote:
Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.

@Dawid: 
Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
]
}'
) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Gyula Fóra
Hi Arvid!

I tried it with Avro 1.9.2, and it lead to the same error.
Seems like Avro cannot find the conversion class between LocalDateTime and timestamp-millis.
Not sure how exactly this works, maybe we need to set the conversions ourselves?

Thanks!
Gyula

On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <[hidden email]> wrote:
Hi Gyula,

it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and see if this solves your particular problem.
The code path in GenericDatumWriter is taking the conversion path, so it might just work. Of course that depends on the schema being correctly translated to a specific record that uses the new TimeConversions [1].


On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <[hidden email]> wrote:
Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.

@Dawid: 
Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
]
}'
) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Using logicalType in the Avro table format

Arvid Heise-3
Okay bummer, but not completely unexpected.

The conversions should be automatically compiled into SpecificRecords. I'm not sure how the Table API is doing it internally; I just saw SpecificRecord in your stacktrace and figured to try it out.

On Thu, Apr 30, 2020 at 3:35 PM Gyula Fóra <[hidden email]> wrote:
Hi Arvid!

I tried it with Avro 1.9.2, and it lead to the same error.
Seems like Avro cannot find the conversion class between LocalDateTime and timestamp-millis.
Not sure how exactly this works, maybe we need to set the conversions ourselves?

Thanks!
Gyula

On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <[hidden email]> wrote:
Hi Gyula,

it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and see if this solves your particular problem.
The code path in GenericDatumWriter is taking the conversion path, so it might just work. Of course that depends on the schema being correctly translated to a specific record that uses the new TimeConversions [1].


On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <[hidden email]> wrote:
Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from the flink side as Dawid mentioned.

@Dawid: 
Sounds like a reasonable explanation, here are the actual queries to reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
]
}'
) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
	at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
	at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
	at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
Hi All!

We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.

According to the docs, we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:

  {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}}
When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap')
	at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long.
I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format.

Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step.

Thank you!
Gyula




--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng