Flink 1.12 cannot handle large schema

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.12 cannot handle large schema

Lian Jiang
Hi,

I am using Flink 1.12 snapshot built on my machine. My job throws an exception when writeUTF a schema from the schema registry.

Caused by: java.io.UTFDataFormatException: encoded string too long: 223502 bytes
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55)
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510)
... 5 common frames omitted

According to https://stackoverflow.com/questions/22741556/dataoutputstream-purpose-of-the-encoded-string-too-long-restriction, java.io.DataOutputStream can only handle max length 65535 strings. Due to this issue, my job cannot deserialize the kafka messages. Any idea is highly appreciated!


Regards
Lian

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 cannot handle large schema

Arvid Heise-3
Hi Lian,

Thank you for reporting. It looks like a bug to me and I created a ticket [1].

You have two options: wait for the fix or implement the fix yourself (copy AvroSerializerSnapshot and use another way to write/read the schema), then subclass AvroSerializer to use your snapshot. Of course, we are happy for any patch.


On Fri, Oct 2, 2020 at 2:23 AM Lian Jiang <[hidden email]> wrote:
Hi,

I am using Flink 1.12 snapshot built on my machine. My job throws an exception when writeUTF a schema from the schema registry.

Caused by: java.io.UTFDataFormatException: encoded string too long: 223502 bytes
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55)
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510)
... 5 common frames omitted

According to https://stackoverflow.com/questions/22741556/dataoutputstream-purpose-of-the-encoded-string-too-long-restriction, java.io.DataOutputStream can only handle max length 65535 strings. Due to this issue, my job cannot deserialize the kafka messages. Any idea is highly appreciated!


Regards
Lian



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 cannot handle large schema

Lian Jiang
Appreciate Arvid for the jira and the workaround. I will monitor the jira status and retry when the fix is available. I can help test the fix when it is in a private branch. Thanks. Regards!

On Fri, Oct 2, 2020 at 3:57 AM Arvid Heise <[hidden email]> wrote:
Hi Lian,

Thank you for reporting. It looks like a bug to me and I created a ticket [1].

You have two options: wait for the fix or implement the fix yourself (copy AvroSerializerSnapshot and use another way to write/read the schema), then subclass AvroSerializer to use your snapshot. Of course, we are happy for any patch.


On Fri, Oct 2, 2020 at 2:23 AM Lian Jiang <[hidden email]> wrote:
Hi,

I am using Flink 1.12 snapshot built on my machine. My job throws an exception when writeUTF a schema from the schema registry.

Caused by: java.io.UTFDataFormatException: encoded string too long: 223502 bytes
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
at org.apache.flink.formats.avro.typeutils.AvroSerializerSnapshot.writeSnapshot(AvroSerializerSnapshot.java:75)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:159)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.writeSnapshot(CompositeTypeSerializerSnapshot.java:148)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.writeVersionedSnapshot(TypeSerializerSnapshot.java:153)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.write(TypeSerializerSnapshotSerializationUtil.java:138)
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:55)
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentWriterImpl.writeStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:183)
at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:126)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:171)
at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:510)
... 5 common frames omitted

According to https://stackoverflow.com/questions/22741556/dataoutputstream-purpose-of-the-encoded-string-too-long-restriction, java.io.DataOutputStream can only handle max length 65535 strings. Due to this issue, my job cannot deserialize the kafka messages. Any idea is highly appreciated!


Regards
Lian



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--