Hi,
I previously wrote about a problem I believed was caused by Kryo serialization (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E), which I am no longer sure is the case. I have a job which involves a TableScan via a custom source operator which generates a DataStream[RowData], a UDF to parse out a String => io.circe.Json object (which internally flows as a RAW('io.circe.Json') data-type), and then an AggregateFunction with a java.util.List accumulator which returns one of these objects and is used in a tumbling window as follows: SELECT any_json_array_value(parse_array(resources)) as resources_sample FROM foo GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR) It generates the following physical plan: optimize result: Sink(table=[catalog.default-db.foo], fields=[resources_sample]) +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time, 3600000)], select=[any_json_array_value($f1) AS resources_sample]) +- Exchange(distribution=[single]) +- Calc(select=[event_time, parse_array(resources) AS $f1]) +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) +- TableSourceScan(table=[[catalog, default-db, foo]], fields=[resources]) When I run my job, I receive the following exception after 10 - 30 seconds (it varies, which gives me a hunch this is related to some race condition that might be happening): Caused by: java.io.IOException: Can't get next record for channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) ... 8 more Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, length: 546153590, index: 43, offset: 0 at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) ... 11 more Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, length: 546153590, index: 43, offset: 0 at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) ... 11 more Just to say that right now I am using the built in Kryo serialization with no custom serializers. Upon further investigation, it seems that StreamElementSerializer is receiving a corrupt stream, which causes it to think it received a record with timestamp: As you can see, the tag is incorrectly as 0 and then a timestamp is attempted to be read, yielding an invalid value (16), and once the TypeSerialzier (BinaryRowDataSerializer) will try to decode this, it'll fail with the index out of bounds exception. The interesting thing is that, when I disable operator chaining completely via (StreamExecutionEnvironment.disableOperatorChaining), the problem does not reproduce. I am wondering which sections of the networking + serialization stack may help me further investigate this issue and understand what is causing the corrupt stream to emerge, or perhaps if there are additional logs that could assist. Best Regards, Yuval Itzchakov. |
Hi Yuval,
we should definitely find the root cause of this issue. It helps if the exception happens frequently to nail down the problem. Have you tried to replace the JSON object with a regular String? If the exception is gone after this change. I believe it must be the serialization and not the network stack. Regards, Timo On 28.01.21 10:29, Yuval Itzchakov wrote: > Hi, > > I previously wrote about a problem I believed was caused by Kryo > serialization > (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E>), > which I am no longer sure is the case. > > I have a job which involves a TableScan via a custom source operator > which generates a DataStream[RowData], a UDF to parse out a String => > io.circe.Json object (which internally flows as a RAW('io.circe.Json') > data-type), and then an AggregateFunction with a java.util.List > accumulator which returns one of these objects and is used in a tumbling > window as follows: > > SELECT any_json_array_value(parse_array(resources)) as resources_sample > FROM foo > GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR) > > It generates the following physical plan: > > optimize result: > Sink(table=[catalog.default-db.foo], fields=[resources_sample]) > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time, > 3600000)], select=[any_json_array_value($f1) AS resources_sample]) > +- Exchange(distribution=[single]) > +- Calc(select=[event_time, parse_array(resources) AS $f1]) > +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) > +- TableSourceScan(table=[[catalog, default-db, foo]], > fields=[resources]) > > When I run my job, I receive the following exception after 10 - 30 > seconds (it varies, which gives me a hunch this is related to some race > condition that might be happening): > > Caused by: java.io.IOException: Can't get next record for channel > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Serializer consumed more bytes than the > record had. This indicates broken serialization. If you are using custom > serialization types (Value or Writable), check their serialization > methods. If you are using a Kryo-serialized type, check the > corresponding Kryo serializer. > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) > ... 8 more > Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, > length: 546153590, index: 43, offset: 0 > at > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > at > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ... 11 more > Caused by: java.lang.IndexOutOfBoundsException: pos: 140289414591019, > length: 546153590, index: 43, offset: 0 > at > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > at > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ... 11 more > > Just to say that right now I am using the built in Kryo serialization > with no custom serializers. > > Upon further investigation, it seems that StreamElementSerializer is > receiving a corrupt stream, which causes it to think it received a > record with timestamp: > > image.png > > As you can see, the tag is incorrectly as 0 and then a timestamp is > attempted to be read, yielding an invalid value (16), and once the > TypeSerialzier (BinaryRowDataSerializer) will try to decode this, it'll > fail with the index out of bounds exception. > > The interesting thing is that, when I *disable operator chaining* > completely via (StreamExecutionEnvironment.disableOperatorChaining), the > problem does not reproduce. > > I am wondering which sections of the networking + serialization stack > may help me further investigate this issue and understand what is > causing the corrupt stream to emerge, or perhaps if there are additional > logs that could assist. > > -- > Best Regards, > Yuval Itzchakov. |
Hi Timo, I tried replacing it with an ordinary ARRAY<STRING> DataType, which doesn't reproduce the issue. If I use a RawType(Array[String]), the problem still manifests, so I assume it's not directly related to a Kryo serialization of the specific underlying type (io.circe.Json), but something in the way it interacts with BinaryRawValueData and writing out to the network buffer behind the scenes. On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <[hidden email]> wrote: Hi Yuval, Best Regards, Yuval Itzchakov. |
This is helpful information. So I guess the problem must be in the
flink-table module and not in flink-core. I will try to reserve some time tomorrow to look into the code again. How did you express RawType(Array[String])? Again with fully serialized type string? Could it be related to https://issues.apache.org/jira/browse/FLINK-20986 ? Regards, Timo On 28.01.21 16:30, Yuval Itzchakov wrote: > Hi Timo, > > I tried replacing it with an ordinary ARRAY<STRING> DataType, which > doesn't reproduce the issue. > If I use a RawType(Array[String]), the problem still manifests, so I > assume it's not directly related to a Kryo serialization of the specific > underlying type (io.circe.Json), but something in the way it interacts > with BinaryRawValueData and writing out to the network buffer behind the > scenes. > > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Yuval, > > we should definitely find the root cause of this issue. It helps if the > exception happens frequently to nail down the problem. > > Have you tried to replace the JSON object with a regular String? If the > exception is gone after this change. I believe it must be the > serialization and not the network stack. > > Regards, > Timo > > > On 28.01.21 10:29, Yuval Itzchakov wrote: > > Hi, > > > > I previously wrote about a problem I believed was caused by Kryo > > serialization > > > (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E> > > > > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E>>), > > > which I am no longer sure is the case. > > > > I have a job which involves a TableScan via a custom source operator > > which generates a DataStream[RowData], a UDF to parse out a > String => > > io.circe.Json object (which internally flows as a > RAW('io.circe.Json') > > data-type), and then an AggregateFunction with a java.util.List > > accumulator which returns one of these objects and is used in a > tumbling > > window as follows: > > > > SELECT any_json_array_value(parse_array(resources)) as > resources_sample > > FROM foo > > GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR) > > > > It generates the following physical plan: > > > > optimize result: > > Sink(table=[catalog.default-db.foo], fields=[resources_sample]) > > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, event_time, > > 3600000)], select=[any_json_array_value($f1) AS resources_sample]) > > +- Exchange(distribution=[single]) > > +- Calc(select=[event_time, parse_array(resources) AS $f1]) > > +- WatermarkAssigner(rowtime=[event_time], > watermark=[event_time]) > > +- TableSourceScan(table=[[catalog, default-db, foo]], > > fields=[resources]) > > > > When I run my job, I receive the following exception after 10 - 30 > > seconds (it varies, which gives me a hunch this is related to > some race > > condition that might be happening): > > > > Caused by: java.io.IOException: Can't get next record for channel > > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > > at > > org.apache.flink.streaming.runtime.io > <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) > > at > > org.apache.flink.streaming.runtime.io > <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.IOException: Serializer consumed more bytes > than the > > record had. This indicates broken serialization. If you are using > custom > > serialization types (Value or Writable), check their serialization > > methods. If you are using a Kryo-serialized type, check the > > corresponding Kryo serializer. > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) > > at > > org.apache.flink.streaming.runtime.io > <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) > > ... 8 more > > Caused by: java.lang.IndexOutOfBoundsException: pos: > 140289414591019, > > length: 546153590, index: 43, offset: 0 > > at > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > at > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > at > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > at > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > at > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > ... 11 more > > Caused by: java.lang.IndexOutOfBoundsException: pos: > 140289414591019, > > length: 546153590, index: 43, offset: 0 > > at > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > at > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > at > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > at > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > at > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > at > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > ... 11 more > > > > Just to say that right now I am using the built in Kryo > serialization > > with no custom serializers. > > > > Upon further investigation, it seems that StreamElementSerializer is > > receiving a corrupt stream, which causes it to think it received a > > record with timestamp: > > > > image.png > > > > As you can see, the tag is incorrectly as 0 and then a timestamp is > > attempted to be read, yielding an invalid value (16), and once the > > TypeSerialzier (BinaryRowDataSerializer) will try to decode this, > it'll > > fail with the index out of bounds exception. > > > > The interesting thing is that, when I *disable operator chaining* > > completely via > (StreamExecutionEnvironment.disableOperatorChaining), the > > problem does not reproduce. > > > > I am wondering which sections of the networking + serialization > stack > > may help me further investigate this issue and understand what is > > causing the corrupt stream to emerge, or perhaps if there are > additional > > logs that could assist. > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > -- > Best Regards, > Yuval Itzchakov. |
Hi Timo, The code example I posted doesn't really match the code that is causing this issue. I tried to extend it a bit but couldn't make the reproduction work there. I am no longer using the serialized strings, but registering the custom serializers with the runtime during bootstrap and overriding getTypeInference to provide the raw data type. But again, I disabled the custom serializer for the test to make sure it is not the one causing the issues. Regarding FLINK-20986, I'm not sure but I am no longer using the old type system so everything should pass through InternalTypeInfo and RawType. I don't see any type equality issues, and I see the same serializer being invoked for both serialization and deserialization. On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <[hidden email]> wrote: This is helpful information. So I guess the problem must be in the Best Regards, Yuval Itzchakov. |
FYI: Yuval and I scheduled a call to investigate this serialization
issue remotely on Monday. If you have any idea by looking at the code beforehand, let us know. On 28.01.21 16:57, Yuval Itzchakov wrote: > Hi Timo, > > The code example I posted doesn't really match the code that is causing > this issue. I tried to extend it a bit but couldn't make the > reproduction work there. > I am no longer using the serialized strings, but registering the custom > serializers with the runtime during bootstrap and overriding > getTypeInference to provide the raw data type. > > But again, I disabled the custom serializer for the test to make sure it > is not the one causing the issues. > > Regarding FLINK-20986 > <https://issues.apache.org/jira/browse/FLINK-20986>, I'm not sure but I > am no longer using the old type system so everything should pass through > InternalTypeInfo and RawType. I don't see any type equality issues, and > I see the same serializer being invoked for both serialization and > deserialization. > > On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > This is helpful information. So I guess the problem must be in the > flink-table module and not in flink-core. I will try to reserve some > time tomorrow to look into the code again. How did you express > RawType(Array[String])? Again with fully serialized type string? > > Could it be related to > https://issues.apache.org/jira/browse/FLINK-20986 > <https://issues.apache.org/jira/browse/FLINK-20986> ? > > Regards, > Timo > > > On 28.01.21 16:30, Yuval Itzchakov wrote: > > Hi Timo, > > > > I tried replacing it with an ordinary ARRAY<STRING> DataType, which > > doesn't reproduce the issue. > > If I use a RawType(Array[String]), the problem still manifests, so I > > assume it's not directly related to a Kryo serialization of the > specific > > underlying type (io.circe.Json), but something in the way it > interacts > > with BinaryRawValueData and writing out to the network buffer > behind the > > scenes. > > > > On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <[hidden email] > <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > Hi Yuval, > > > > we should definitely find the root cause of this issue. It > helps if the > > exception happens frequently to nail down the problem. > > > > Have you tried to replace the JSON object with a regular > String? If the > > exception is gone after this change. I believe it must be the > > serialization and not the network stack. > > > > Regards, > > Timo > > > > > > On 28.01.21 10:29, Yuval Itzchakov wrote: > > > Hi, > > > > > > I previously wrote about a problem I believed was caused > by Kryo > > > serialization > > > > > > (https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E> > > > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E>> > > > > > > > > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E> > > > <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E <https://mail-archives.apache.org/mod_mbox/flink-user/202101.mbox/%3Cf9c44b72-f221-8cd3-24cc-b1090345c759@...%3E>>>), > > > > > which I am no longer sure is the case. > > > > > > I have a job which involves a TableScan via a custom > source operator > > > which generates a DataStream[RowData], a UDF to parse out a > > String => > > > io.circe.Json object (which internally flows as a > > RAW('io.circe.Json') > > > data-type), and then an AggregateFunction with a > java.util.List > > > accumulator which returns one of these objects and is used > in a > > tumbling > > > window as follows: > > > > > > SELECT any_json_array_value(parse_array(resources)) as > > resources_sample > > > FROM foo > > > GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR) > > > > > > It generates the following physical plan: > > > > > > optimize result: > > > Sink(table=[catalog.default-db.foo], > fields=[resources_sample]) > > > +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, > event_time, > > > 3600000)], select=[any_json_array_value($f1) AS > resources_sample]) > > > +- Exchange(distribution=[single]) > > > +- Calc(select=[event_time, parse_array(resources) > AS $f1]) > > > +- WatermarkAssigner(rowtime=[event_time], > > watermark=[event_time]) > > > +- TableSourceScan(table=[[catalog, > default-db, foo]], > > > fields=[resources]) > > > > > > When I run my job, I receive the following exception after > 10 - 30 > > > seconds (it varies, which gives me a hunch this is related to > > some race > > > condition that might be happening): > > > > > > Caused by: java.io.IOException: Can't get next record for > channel > > > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > > > at > > > org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io> > > <http://runtime.io > <http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166) > > > at > > > org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io> > > <http://runtime.io > <http://runtime.io>>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > > > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.io.IOException: Serializer consumed more bytes > > than the > > > record had. This indicates broken serialization. If you > are using > > custom > > > serialization types (Value or Writable), check their > serialization > > > methods. If you are using a Kryo-serialized type, check the > > > corresponding Kryo serializer. > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86) > > > at > > > org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io> > > <http://runtime.io > <http://runtime.io>>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163) > > > ... 8 more > > > Caused by: java.lang.IndexOutOfBoundsException: pos: > > 140289414591019, > > > length: 546153590, index: 43, offset: 0 > > > at > > > > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > > at > > > > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > > ... 11 more > > > Caused by: java.lang.IndexOutOfBoundsException: pos: > > 140289414591019, > > > length: 546153590, index: 43, offset: 0 > > > at > > > > > > org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:190) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108) > > > at > > > > > > org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48) > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > > > at > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > > > at > > > > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > > > at > > > org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io> > > <http://org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > > > ... 11 more > > > > > > Just to say that right now I am using the built in Kryo > > serialization > > > with no custom serializers. > > > > > > Upon further investigation, it seems that > StreamElementSerializer is > > > receiving a corrupt stream, which causes it to think it > received a > > > record with timestamp: > > > > > > image.png > > > > > > As you can see, the tag is incorrectly as 0 and then a > timestamp is > > > attempted to be read, yielding an invalid value (16), and > once the > > > TypeSerialzier (BinaryRowDataSerializer) will try to > decode this, > > it'll > > > fail with the index out of bounds exception. > > > > > > The interesting thing is that, when I *disable operator > chaining* > > > completely via > > (StreamExecutionEnvironment.disableOperatorChaining), the > > > problem does not reproduce. > > > > > > I am wondering which sections of the networking + > serialization > > stack > > > may help me further investigate this issue and understand > what is > > > causing the corrupt stream to emerge, or perhaps if there are > > additional > > > logs that could assist. > > > > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > > > -- > Best Regards, > Yuval Itzchakov. |
Free forum by Nabble | Edit this page |