Flink 1.12 Kryo Serialization Error

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.12 Kryo Serialization Error

Yuval Itzchakov
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Yuval Itzchakov
Further debugging this issue, this currently seems unrelated to Kryo at all.

I have a stage that emits a case class down the stream. I can see the serialization part works fine, but when the receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already surpassed it's buffer limit:

image.png

Any help would be greatly appreciated.

On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Yuval Itzchakov
OK, this turned out to actually be a problem with the Kryo serialization. For some reason, it does not like that I try to generate a JSON with no spaces, only when I use 2 spaces will it work properly.
I am at loss of words.

Just to emphasize the difference:

No Spaces:

image.png
Doesn't work.

With spaces:

image.png

works fine.



On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov <[hidden email]> wrote:
Further debugging this issue, this currently seems unrelated to Kryo at all.

I have a stage that emits a case class down the stream. I can see the serialization part works fine, but when the receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already surpassed it's buffer limit:

image.png

Any help would be greatly appreciated.

On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Chesnay Schepler
I would think that the likely explanation is some bug in the formatting code of the library you are using.
Just for fun you could try manually removing all spaces within write and see how that turns out. (let's ignore for now that this might also affect keys&values).

On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
OK, this turned out to actually be a problem with the Kryo serialization. For some reason, it does not like that I try to generate a JSON with no spaces, only when I use 2 spaces will it work properly.
I am at loss of words.

Just to emphasize the difference:

No Spaces:

image.png
Doesn't work.

With spaces:

image.png

works fine.



On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov <[hidden email]> wrote:
Further debugging this issue, this currently seems unrelated to Kryo at all.

I have a stage that emits a case class down the stream. I can see the serialization part works fine, but when the receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already surpassed it's buffer limit:

image.png

Any help would be greatly appreciated.

On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Yuval Itzchakov
Hi Chesnay,
Turns out it didn't actually work, there were one or two successful runs but the problem still persists (it's a bit non deterministic, and doesn't always reproduce when parallelism is set to 1).

I turned off all Kryo custom serialization and am only using Flink provided one's ATM, the problem still persists.
There seems to be an issue with how Flink serializes these raw types over the wire, but I still can't put my finger as to what the problem is.

What I can see is that Flink tries to consume a HybridMemorySegment which contains one of these custom raw types I have and because of malformed content it receives a negative length for the byte array:

image.png

Content seems to be prepended with a bunch of NULL values which through off the length calculation:

image.png

But I still don't have the entire chain of execution wrapped mentally in my head, trying to figure it out.

An additional error I'm receiving, even when removing the problematic JSON field and switching it out for a String:

java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]

On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <[hidden email]> wrote:
I would think that the likely explanation is some bug in the formatting code of the library you are using.
Just for fun you could try manually removing all spaces within write and see how that turns out. (let's ignore for now that this might also affect keys&values).

On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
OK, this turned out to actually be a problem with the Kryo serialization. For some reason, it does not like that I try to generate a JSON with no spaces, only when I use 2 spaces will it work properly.
I am at loss of words.

Just to emphasize the difference:

No Spaces:

image.png
Doesn't work.

With spaces:

image.png

works fine.



On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov <[hidden email]> wrote:
Further debugging this issue, this currently seems unrelated to Kryo at all.

I have a stage that emits a case class down the stream. I can see the serialization part works fine, but when the receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already surpassed it's buffer limit:

image.png

Any help would be greatly appreciated.

On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.




--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Piotr Nowojski-5
Hi Yuval,

I have a couple of questions:

1. I see that you are using Flink 1.12.0, is that correct?
2. Have you tried running your application with a different Flink version? If you are using 1.12.0, could you check Flink 1.11.3, or vice versa?
3. What's the configuration that you are using? For example, have you enabled unaligned checkpoints or some other feature?
4. Is the problem still there if you replace Kryo with something else (Java's serialisation?)?
5. Could it be a problem with dependency convergence? Like maybe there are different versions of Flink jars present during runtime?
6. Lastly, would it be possible for you to prepare a minimal example that could reproduce the problem?

Piotrek

wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]> napisał(a):
Hi Chesnay,
Turns out it didn't actually work, there were one or two successful runs but the problem still persists (it's a bit non deterministic, and doesn't always reproduce when parallelism is set to 1).

I turned off all Kryo custom serialization and am only using Flink provided one's ATM, the problem still persists.
There seems to be an issue with how Flink serializes these raw types over the wire, but I still can't put my finger as to what the problem is.

What I can see is that Flink tries to consume a HybridMemorySegment which contains one of these custom raw types I have and because of malformed content it receives a negative length for the byte array:

image.png

Content seems to be prepended with a bunch of NULL values which through off the length calculation:

image.png

But I still don't have the entire chain of execution wrapped mentally in my head, trying to figure it out.

An additional error I'm receiving, even when removing the problematic JSON field and switching it out for a String:

java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-core-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]

On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <[hidden email]> wrote:
I would think that the likely explanation is some bug in the formatting code of the library you are using.
Just for fun you could try manually removing all spaces within write and see how that turns out. (let's ignore for now that this might also affect keys&values).

On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
OK, this turned out to actually be a problem with the Kryo serialization. For some reason, it does not like that I try to generate a JSON with no spaces, only when I use 2 spaces will it work properly.
I am at loss of words.

Just to emphasize the difference:

No Spaces:

image.png
Doesn't work.

With spaces:

image.png

works fine.



On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov <[hidden email]> wrote:
Further debugging this issue, this currently seems unrelated to Kryo at all.

I have a stage that emits a case class down the stream. I can see the serialization part works fine, but when the receiving side is attempting to deserialize the the case class
it receives a NonSpanningWrapper that has already surpassed it's buffer limit:

image.png

Any help would be greatly appreciated.

On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov <[hidden email]> wrote:
Hi,

I've implemented a KryoSerializer for a specific JSON type in my application as I have a bunch of UDFs that depend on a RAW('io.circe.Json') encoder being available. The implementation is rather simple. When I run my Flink application with Kryo in trace logs, I see that data gets properly serialized / deserialized using the serializer. However, after about 30 seconds, the application blows up with the following error:

Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: pos: 140513145180741, length: 733793654, index: 69, offset: 0
at org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
... 11 more

Or with the following exception:

Caused by: java.lang.NegativeArraySizeException
at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

I have, however, checked that the serialization works properly and there is no issue there.
I have the following registration during bootstrap:

flink.registerType(classOf[Json])
flink.addDefaultKryoSerializer(classOf[Json], classOf[JsonKryoSerializer])

And the following is the implementation of the serializer: 

import com.esotericsoftware.kryo.io.{ Input, Output }
import com.esotericsoftware.kryo.{ Kryo, Serializer }
import io.circe.Json
import io.circe.jawn.JawnParser

final class JsonKryoSerializer extends Serializer[Json](true, false) with Serializable {
  private val jawnParser = new JawnParser()

  override def write(kryo: Kryo, output: Output, `object`: Json): Unit =
    output.writeString(`object`.noSpaces)

  override def read(kryo: Kryo, input: Input, `type`: Class[Json]): Json = {
    val str = input.readString()

    if (str == null) Json.Null
    else
      jawnParser.parse(str) match {
        case Left(err)    => throw err
        case Right(value) => value
      }
  }

  override def copy(kryo: Kryo, original: Json): Json =
    jawnParser.parse(original.noSpaces) match {
      case Left(err)    => throw err
      case Right(value) => value
    }
}

Would appreciate any help on how to debug this further.

--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.


--
Best Regards,
Yuval Itzchakov.




--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Timo Walther
Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing and
when deserializing. This might be due to `ExecutionConfig` not shipped
(or copied) through the stack correctly.

Even though an error in the stack should be visible immediately and not
after 30 seconds, I still would also investigate an error in this direction.

Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:

> Hi Yuval,
>
> I have a couple of questions:
>
> 1. I see that you are using Flink 1.12.0, is that correct?
> 2. Have you tried running your application with a different Flink
> version? If you are using 1.12.0, could you check Flink 1.11.3, or vice
> versa?
> 3. What's the configuration that you are using? For example, have you
> enabled unaligned checkpoints or some other feature?
> 4. Is the problem still there if you replace Kryo with something else
> (Java's serialisation?)?
> 5. Could it be a problem with dependency convergence? Like maybe there
> are different versions of Flink jars present during runtime?
> 6. Lastly, would it be possible for you to prepare a minimal example
> that could reproduce the problem?
>
> Piotrek
>
> wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> napisał(a):
>
>     Hi Chesnay,
>     Turns out it didn't actually work, there were one or two
>     successful runs but the problem still persists (it's a bit non
>     deterministic, and doesn't always reproduce when parallelism is set
>     to 1).
>
>     I turned off all Kryo custom serialization and am only using Flink
>     provided one's ATM, the problem still persists.
>     There seems to be an issue with how Flink serializes these raw types
>     over the wire, but I still can't put my finger as to what the
>     problem is.
>
>     What I can see is that Flink tries to consume a HybridMemorySegment
>     which contains one of these custom raw types I have and because of
>     malformed content it receives a negative length for the byte array:
>
>     image.png
>
>     Content seems to be prepended with a bunch of NULL values which
>     through off the length calculation:
>
>     image.png
>
>     But I still don't have the entire chain of execution wrapped
>     mentally in my head, trying to figure it out.
>
>     An additional error I'm receiving, even when removing the
>     problematic JSON field and switching it out for a String:
>
>     java.lang.IllegalStateException: When there are multiple buffers, an
>     unfinished bufferConsumer can not be at the head of the buffers queue.
>     at
>     org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>     ~[flink-core-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>
>     On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         I would think that the likely explanation is some bug in the
>         formatting code of the library you are using.
>         Just for fun you could try manually removing all spaces within
>         write and see how that turns out. (let's ignore for now that
>         this might also affect keys&values).
>
>         On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>>         OK, this turned out to actually be a problem with the Kryo
>>         serialization. For some reason, it does not like that I try to
>>         generate a JSON with no spaces, only when I use 2 spaces will
>>         it work properly.
>>         I am at loss of words.
>>
>>         Just to emphasize the difference:
>>
>>         No Spaces:
>>
>>         image.png
>>         Doesn't work.
>>
>>         With spaces:
>>
>>         image.png
>>
>>         works fine.
>>
>>
>>
>>         On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>             Further debugging this issue, this currently seems
>>             unrelated to Kryo at all.
>>
>>             I have a stage that emits a case class down the stream. I
>>             can see the serialization part works fine, but when the
>>             receiving side is attempting to deserialize the the case class
>>             it receives a NonSpanningWrapper that has already
>>             surpassed it's buffer limit:
>>
>>             image.png
>>
>>             Any help would be greatly appreciated.
>>
>>             On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>>             <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>                 Hi,
>>
>>                 I've implemented a KryoSerializer for a specific JSON
>>                 type in my application as I have a bunch of UDFs
>>                 that depend on a RAW('io.circe.Json') encoder being
>>                 available. The implementation is rather simple. When I
>>                 run my Flink application with Kryo in trace logs, I
>>                 see that data gets properly serialized / deserialized
>>                 using the serializer. However, after about 30 seconds,
>>                 the application blows up with the following error:
>>
>>                 Caused by: java.io.IOException: Serializer consumed
>>                 more bytes than the record had. This indicates broken
>>                 serialization. If you are using custom serialization
>>                 types (Value or Writable), check their serialization
>>                 methods. If you are using a Kryo-serialized type,
>>                 check the corresponding Kryo serializer.
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>                 at java.lang.Thread.run(Thread.java:748)
>>                 Caused by: java.lang.IndexOutOfBoundsException: pos:
>>                 140513145180741, length: 733793654, index: 69, offset: 0
>>                 at
>>                 org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>                 at
>>                 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>                 ... 11 more
>>
>>                 Or with the following exception:
>>
>>                 Caused by: java.lang.NegativeArraySizeException
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>                 at
>>                 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>                 at java.lang.Thread.run(Thread.java:748)
>>
>>                 I have, however, checked that the serialization works
>>                 properly and there is no issue there.
>>                 I have the following registration during bootstrap:
>>
>>                 flink.registerType(classOf[Json])
>>                 flink.addDefaultKryoSerializer(classOf[Json],
>>                 classOf[JsonKryoSerializer])
>>
>>                 And the following is the implementation of the
>>                 serializer:
>>
>>                 import com.esotericsoftware.kryo.io.{ Input, Output }
>>                 import com.esotericsoftware.kryo.{ Kryo, Serializer }
>>                 import io.circe.Json
>>                 import io.circe.jawn.JawnParser
>>
>>                 final class JsonKryoSerializer extends
>>                 Serializer[Json](true, false) with Serializable {
>>                   private val jawnParser = new JawnParser()
>>
>>                   override def write(kryo: Kryo, output: Output,
>>                 `object`: Json): Unit =
>>                     output.writeString(`object`.noSpaces)
>>
>>                   override def read(kryo: Kryo, input: Input, `type`:
>>                 Class[Json]): Json = {
>>                     val str = input.readString()
>>
>>                     if (str == null) Json.Null
>>                     else
>>                       jawnParser.parse(str) match {
>>                         case Left(err)    => throw err
>>                         case Right(value) => value
>>                       }
>>                   }
>>
>>                   override def copy(kryo: Kryo, original: Json): Json =
>>                     jawnParser.parse(original.noSpaces) match {
>>                       case Left(err)    => throw err
>>                       case Right(value) => value
>>                     }
>>                 }
>>
>>                 Would appreciate any help on how to debug this further.
>>
>>                 --
>>                 Best Regards,
>>                 Yuval Itzchakov.
>>
>>
>>
>>             --
>>             Best Regards,
>>             Yuval Itzchakov.
>>
>>
>>
>>         --
>>         Best Regards,
>>         Yuval Itzchakov.
>
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Yuval Itzchakov
Hi Timo and Piotr,

Let me try and answer all your questions:

Piotr:

1. Yes, I am using Flink 1.12.0
2. I have no tried downgrading to Flink 1.11.3, as I have features that are specific to 1.12 that I need (namely the ability to create a DataStreamScanProvider which was not available in previous versions)
3. I am using a pretty standard configuration. The only thing I've set was checkpointing (using the default MemoryStateBackend):

image.png
4. This is the interesting bit. When I try to create a small reproduction outside the codebase, using a simple source the issue does not reproduce, both with default Kryo serialization and with my own Kryo serializer.
5. No, here is the relevant bit of build.sbt (flinkVersion is set to 1.12)
image.png
6. I am trying to come up with a reproduction, thus far with no luck. Here's what I have so far: https://github.com/YuvalItzchakov/flink-bug-repro. I am afraid that there are many more moving parts that are affecting this issue (I have a custom flink source and sink involved)

Timo:

I am explicitly passing a serialized string of my custom Kryo serializer to the UDF (see https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31). I can validate that both serialization and deserialization invoke the method defined on my custom serializer, if that's what you mean.
Otherwise, if there's a mismatch between the two serializers Flink blows up at runtime saying that the types don't match.

On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <[hidden email]> wrote:
Hi Yuval,

could you share a reproducible example with us?

I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing and
when deserializing. This might be due to `ExecutionConfig` not shipped
(or copied) through the stack correctly.

Even though an error in the stack should be visible immediately and not
after 30 seconds, I still would also investigate an error in this direction.

Regards,
Timo


On 13.01.21 09:47, Piotr Nowojski wrote:
> Hi Yuval,
>
> I have a couple of questions:
>
> 1. I see that you are using Flink 1.12.0, is that correct?
> 2. Have you tried running your application with a different Flink
> version? If you are using 1.12.0, could you check Flink 1.11.3, or vice
> versa?
> 3. What's the configuration that you are using? For example, have you
> enabled unaligned checkpoints or some other feature?
> 4. Is the problem still there if you replace Kryo with something else
> (Java's serialisation?)?
> 5. Could it be a problem with dependency convergence? Like maybe there
> are different versions of Flink jars present during runtime?
> 6. Lastly, would it be possible for you to prepare a minimal example
> that could reproduce the problem?
>
> Piotrek
>
> wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]
> <mailto:[hidden email]>> napisał(a):
>
>     Hi Chesnay,
>     Turns out it didn't actually work, there were one or two
>     successful runs but the problem still persists (it's a bit non
>     deterministic, and doesn't always reproduce when parallelism is set
>     to 1).
>
>     I turned off all Kryo custom serialization and am only using Flink
>     provided one's ATM, the problem still persists.
>     There seems to be an issue with how Flink serializes these raw types
>     over the wire, but I still can't put my finger as to what the
>     problem is.
>
>     What I can see is that Flink tries to consume a HybridMemorySegment
>     which contains one of these custom raw types I have and because of
>     malformed content it receives a negative length for the byte array:
>
>     image.png
>
>     Content seems to be prepended with a bunch of NULL values which
>     through off the length calculation:
>
>     image.png
>
>     But I still don't have the entire chain of execution wrapped
>     mentally in my head, trying to figure it out.
>
>     An additional error I'm receiving, even when removing the
>     problematic JSON field and switching it out for a String:
>
>     java.lang.IllegalStateException: When there are multiple buffers, an
>     unfinished bufferConsumer can not be at the head of the buffers queue.
>     at
>     org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>     ~[flink-core-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>
>     On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         I would think that the likely explanation is some bug in the
>         formatting code of the library you are using.
>         Just for fun you could try manually removing all spaces within
>         write and see how that turns out. (let's ignore for now that
>         this might also affect keys&values).
>
>         On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>>         OK, this turned out to actually be a problem with the Kryo
>>         serialization. For some reason, it does not like that I try to
>>         generate a JSON with no spaces, only when I use 2 spaces will
>>         it work properly.
>>         I am at loss of words.
>>
>>         Just to emphasize the difference:
>>
>>         No Spaces:
>>
>>         image.png
>>         Doesn't work.
>>
>>         With spaces:
>>
>>         image.png
>>
>>         works fine.
>>
>>
>>
>>         On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>>         <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>             Further debugging this issue, this currently seems
>>             unrelated to Kryo at all.
>>
>>             I have a stage that emits a case class down the stream. I
>>             can see the serialization part works fine, but when the
>>             receiving side is attempting to deserialize the the case class
>>             it receives a NonSpanningWrapper that has already
>>             surpassed it's buffer limit:
>>
>>             image.png
>>
>>             Any help would be greatly appreciated.
>>
>>             On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>>             <[hidden email] <mailto:[hidden email]>> wrote:
>>
>>                 Hi,
>>
>>                 I've implemented a KryoSerializer for a specific JSON
>>                 type in my application as I have a bunch of UDFs
>>                 that depend on a RAW('io.circe.Json') encoder being
>>                 available. The implementation is rather simple. When I
>>                 run my Flink application with Kryo in trace logs, I
>>                 see that data gets properly serialized / deserialized
>>                 using the serializer. However, after about 30 seconds,
>>                 the application blows up with the following error:
>>
>>                 Caused by: java.io.IOException: Serializer consumed
>>                 more bytes than the record had. This indicates broken
>>                 serialization. If you are using custom serialization
>>                 types (Value or Writable), check their serialization
>>                 methods. If you are using a Kryo-serialized type,
>>                 check the corresponding Kryo serializer.
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>                 at java.lang.Thread.run(Thread.java:748)
>>                 Caused by: java.lang.IndexOutOfBoundsException: pos:
>>                 140513145180741, length: 733793654, index: 69, offset: 0
>>                 at
>>                 org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>                 at
>>                 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>                 ... 11 more
>>
>>                 Or with the following exception:
>>
>>                 Caused by: java.lang.NegativeArraySizeException
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>                 at
>>                 org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>                 at
>>                 org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>                 at
>>                 org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>                 at
>>                 org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>                 at
>>                 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>                 at
>>                 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>                 at
>>                 org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>                 at java.lang.Thread.run(Thread.java:748)
>>
>>                 I have, however, checked that the serialization works
>>                 properly and there is no issue there.
>>                 I have the following registration during bootstrap:
>>
>>                 flink.registerType(classOf[Json])
>>                 flink.addDefaultKryoSerializer(classOf[Json],
>>                 classOf[JsonKryoSerializer])
>>
>>                 And the following is the implementation of the
>>                 serializer:
>>
>>                 import com.esotericsoftware.kryo.io.{ Input, Output }
>>                 import com.esotericsoftware.kryo.{ Kryo, Serializer }
>>                 import io.circe.Json
>>                 import io.circe.jawn.JawnParser
>>
>>                 final class JsonKryoSerializer extends
>>                 Serializer[Json](true, false) with Serializable {
>>                   private val jawnParser = new JawnParser()
>>
>>                   override def write(kryo: Kryo, output: Output,
>>                 `object`: Json): Unit =
>>                     output.writeString(`object`.noSpaces)
>>
>>                   override def read(kryo: Kryo, input: Input, `type`:
>>                 Class[Json]): Json = {
>>                     val str = input.readString()
>>
>>                     if (str == null) Json.Null
>>                     else
>>                       jawnParser.parse(str) match {
>>                         case Left(err)    => throw err
>>                         case Right(value) => value
>>                       }
>>                   }
>>
>>                   override def copy(kryo: Kryo, original: Json): Json =
>>                     jawnParser.parse(original.noSpaces) match {
>>                       case Left(err)    => throw err
>>                       case Right(value) => value
>>                     }
>>                 }
>>
>>                 Would appreciate any help on how to debug this further.
>>
>>                 --
>>                 Best Regards,
>>                 Yuval Itzchakov.
>>
>>
>>
>>             --
>>             Best Regards,
>>             Yuval Itzchakov.
>>
>>
>>
>>         --
>>         Best Regards,
>>         Yuval Itzchakov.
>
>
>
>
>     --
>     Best Regards,
>     Yuval Itzchakov.
>



--
Best Regards,
Yuval Itzchakov.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Timo Walther
Hi Yuval,

thanks for sharing some code with us. I scanned the code but could not
find anything suspicious from an API perspective. By using the full RAW
serializable string, we should actually be on the save side when it
comes to configure the Kryo serializer.

I would suggest to further investigate in the checkpointing area if it
only occurs when checkpointing is enabled.

Regards,
Timo

On 13.01.21 13:35, Yuval Itzchakov wrote:

> Hi Timo and Piotr,
>
> Let me try and answer all your questions:
>
> Piotr:
>
> 1. Yes, I am using Flink 1.12.0
> 2. I have no tried downgrading to Flink 1.11.3, as I have features that
> are specific to 1.12 that I need (namely the ability to create
> a DataStreamScanProvider which was not available in previous versions)
> 3. I am using a pretty standard configuration. The only thing I've set
> was checkpointing (using the default MemoryStateBackend):
>
> image.png
> 4. This is the interesting bit. When I try to create a small
> reproduction outside the codebase, using a simple source the issue does
> not reproduce, both with default Kryo serialization and with my own Kryo
> serializer.
> 5. No, here is the relevant bit of build.sbt (flinkVersion is set to 1.12)
> image.png
> 6. I am trying to come up with a reproduction, thus far with no luck.
> Here's what I have so far:
> https://github.com/YuvalItzchakov/flink-bug-repro 
> <https://github.com/YuvalItzchakov/flink-bug-repro>. I am afraid that
> there are many more moving parts that are affecting this issue (I have a
> custom flink source and sink involved)
>
> Timo:
>
> I am explicitly passing a serialized string of my custom Kryo serializer
> to the UDF (see
> https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
> <https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31>).
> I can validate that both serialization and deserialization invoke the
> method defined on my custom serializer, if that's what you mean.
> Otherwise, if there's a mismatch between the two serializers Flink blows
> up at runtime saying that the types don't match.
>
> On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Yuval,
>
>     could you share a reproducible example with us?
>
>     I see you are using SQL / Table API with a RAW type. I could imagine
>     that the KryoSerializer is configured differently when serializing and
>     when deserializing. This might be due to `ExecutionConfig` not shipped
>     (or copied) through the stack correctly.
>
>     Even though an error in the stack should be visible immediately and not
>     after 30 seconds, I still would also investigate an error in this
>     direction.
>
>     Regards,
>     Timo
>
>
>     On 13.01.21 09:47, Piotr Nowojski wrote:
>      > Hi Yuval,
>      >
>      > I have a couple of questions:
>      >
>      > 1. I see that you are using Flink 1.12.0, is that correct?
>      > 2. Have you tried running your application with a different Flink
>      > version? If you are using 1.12.0, could you check Flink 1.11.3,
>     or vice
>      > versa?
>      > 3. What's the configuration that you are using? For example, have
>     you
>      > enabled unaligned checkpoints or some other feature?
>      > 4. Is the problem still there if you replace Kryo with something
>     else
>      > (Java's serialisation?)?
>      > 5. Could it be a problem with dependency convergence? Like maybe
>     there
>      > are different versions of Flink jars present during runtime?
>      > 6. Lastly, would it be possible for you to prepare a minimal example
>      > that could reproduce the problem?
>      >
>      > Piotrek
>      >
>      > wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> napisał(a):
>      >
>      >     Hi Chesnay,
>      >     Turns out it didn't actually work, there were one or two
>      >     successful runs but the problem still persists (it's a bit non
>      >     deterministic, and doesn't always reproduce when parallelism
>     is set
>      >     to 1).
>      >
>      >     I turned off all Kryo custom serialization and am only using
>     Flink
>      >     provided one's ATM, the problem still persists.
>      >     There seems to be an issue with how Flink serializes these
>     raw types
>      >     over the wire, but I still can't put my finger as to what the
>      >     problem is.
>      >
>      >     What I can see is that Flink tries to consume a
>     HybridMemorySegment
>      >     which contains one of these custom raw types I have and
>     because of
>      >     malformed content it receives a negative length for the byte
>     array:
>      >
>      >     image.png
>      >
>      >     Content seems to be prepended with a bunch of NULL values which
>      >     through off the length calculation:
>      >
>      >     image.png
>      >
>      >     But I still don't have the entire chain of execution wrapped
>      >     mentally in my head, trying to figure it out.
>      >
>      >     An additional error I'm receiving, even when removing the
>      >     problematic JSON field and switching it out for a String:
>      >
>      >     java.lang.IllegalStateException: When there are multiple
>     buffers, an
>      >     unfinished bufferConsumer can not be at the head of the
>     buffers queue.
>      >     at
>      >  
>       org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>      >     ~[flink-core-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      > org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >  
>       org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >     org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >  
>       org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >  
>       org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >  
>       org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at
>      >  
>       org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>      >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>      >     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>      >
>      >     On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >         I would think that the likely explanation is some bug in the
>      >         formatting code of the library you are using.
>      >         Just for fun you could try manually removing all spaces
>     within
>      >         write and see how that turns out. (let's ignore for now that
>      >         this might also affect keys&values).
>      >
>      >         On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>      >>         OK, this turned out to actually be a problem with the Kryo
>      >>         serialization. For some reason, it does not like that I
>     try to
>      >>         generate a JSON with no spaces, only when I use 2 spaces
>     will
>      >>         it work properly.
>      >>         I am at loss of words.
>      >>
>      >>         Just to emphasize the difference:
>      >>
>      >>         No Spaces:
>      >>
>      >>         image.png
>      >>         Doesn't work.
>      >>
>      >>         With spaces:
>      >>
>      >>         image.png
>      >>
>      >>         works fine.
>      >>
>      >>
>      >>
>      >>         On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>      >>         <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >>
>      >>             Further debugging this issue, this currently seems
>      >>             unrelated to Kryo at all.
>      >>
>      >>             I have a stage that emits a case class down the
>     stream. I
>      >>             can see the serialization part works fine, but when the
>      >>             receiving side is attempting to deserialize the the
>     case class
>      >>             it receives a NonSpanningWrapper that has already
>      >>             surpassed it's buffer limit:
>      >>
>      >>             image.png
>      >>
>      >>             Any help would be greatly appreciated.
>      >>
>      >>             On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>      >>             <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >>
>      >>                 Hi,
>      >>
>      >>                 I've implemented a KryoSerializer for a specific
>     JSON
>      >>                 type in my application as I have a bunch of UDFs
>      >>                 that depend on a RAW('io.circe.Json') encoder being
>      >>                 available. The implementation is rather simple.
>     When I
>      >>                 run my Flink application with Kryo in trace logs, I
>      >>                 see that data gets properly serialized /
>     deserialized
>      >>                 using the serializer. However, after about 30
>     seconds,
>      >>                 the application blows up with the following error:
>      >>
>      >>                 Caused by: java.io.IOException: Serializer consumed
>      >>                 more bytes than the record had. This indicates
>     broken
>      >>                 serialization. If you are using custom serialization
>      >>                 types (Value or Writable), check their serialization
>      >>                 methods. If you are using a Kryo-serialized type,
>      >>                 check the corresponding Kryo serializer.
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>      >>                 at
>      >>                 org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>      >>                 at
>      >>                 org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>      >>                 at java.lang.Thread.run(Thread.java:748)
>      >>                 Caused by: java.lang.IndexOutOfBoundsException: pos:
>      >>                 140513145180741, length: 733793654, index: 69,
>     offset: 0
>      >>                 at
>      >>              
>       org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>      >>                 ... 11 more
>      >>
>      >>                 Or with the following exception:
>      >>
>      >>                 Caused by: java.lang.NegativeArraySizeException
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>      >>                 at
>      >>              
>       org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>      >>                 at
>      >> org.apache.flink.runtime.io
>     <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>      >>                 at
>      >>                 org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>      >>                 at
>      >>                 org.apache.flink.streaming.runtime.io
>     <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>      >>                 at
>      >>              
>       org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>      >>                 at
>      >>              
>       org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>      >>                 at java.lang.Thread.run(Thread.java:748)
>      >>
>      >>                 I have, however, checked that the serialization
>     works
>      >>                 properly and there is no issue there.
>      >>                 I have the following registration during bootstrap:
>      >>
>      >>                 flink.registerType(classOf[Json])
>      >>                 flink.addDefaultKryoSerializer(classOf[Json],
>      >>                 classOf[JsonKryoSerializer])
>      >>
>      >>                 And the following is the implementation of the
>      >>                 serializer:
>      >>
>      >>                 import com.esotericsoftware.kryo.io.{ Input,
>     Output }
>      >>                 import com.esotericsoftware.kryo.{ Kryo,
>     Serializer }
>      >>                 import io.circe.Json
>      >>                 import io.circe.jawn.JawnParser
>      >>
>      >>                 final class JsonKryoSerializer extends
>      >>                 Serializer[Json](true, false) with Serializable {
>      >>                   private val jawnParser = new JawnParser()
>      >>
>      >>                   override def write(kryo: Kryo, output: Output,
>      >>                 `object`: Json): Unit =
>      >>                     output.writeString(`object`.noSpaces)
>      >>
>      >>                   override def read(kryo: Kryo, input: Input,
>     `type`:
>      >>                 Class[Json]): Json = {
>      >>                     val str = input.readString()
>      >>
>      >>                     if (str == null) Json.Null
>      >>                     else
>      >>                       jawnParser.parse(str) match {
>      >>                         case Left(err)    => throw err
>      >>                         case Right(value) => value
>      >>                       }
>      >>                   }
>      >>
>      >>                   override def copy(kryo: Kryo, original: Json):
>     Json =
>      >>                     jawnParser.parse(original.noSpaces) match {
>      >>                       case Left(err)    => throw err
>      >>                       case Right(value) => value
>      >>                     }
>      >>                 }
>      >>
>      >>                 Would appreciate any help on how to debug this
>     further.
>      >>
>      >>                 --
>      >>                 Best Regards,
>      >>                 Yuval Itzchakov.
>      >>
>      >>
>      >>
>      >>             --
>      >>             Best Regards,
>      >>             Yuval Itzchakov.
>      >>
>      >>
>      >>
>      >>         --
>      >>         Best Regards,
>      >>         Yuval Itzchakov.
>      >
>      >
>      >
>      >
>      >     --
>      >     Best Regards,
>      >     Yuval Itzchakov.
>      >
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Timo Walther
I ported the code to the Flink code base. Because I had issues with SBT
and Scala 2.12. Note it uses an older version of circe. I'm just pasting
it here in case it helps someone.

Regards,
Timo

On 18.01.21 13:51, Timo Walther wrote:

> Hi Yuval,
>
> thanks for sharing some code with us. I scanned the code but could not
> find anything suspicious from an API perspective. By using the full RAW
> serializable string, we should actually be on the save side when it
> comes to configure the Kryo serializer.
>
> I would suggest to further investigate in the checkpointing area if it
> only occurs when checkpointing is enabled.
>
> Regards,
> Timo
>
> On 13.01.21 13:35, Yuval Itzchakov wrote:
>> Hi Timo and Piotr,
>>
>> Let me try and answer all your questions:
>>
>> Piotr:
>>
>> 1. Yes, I am using Flink 1.12.0
>> 2. I have no tried downgrading to Flink 1.11.3, as I have features
>> that are specific to 1.12 that I need (namely the ability to create
>> a DataStreamScanProvider which was not available in previous versions)
>> 3. I am using a pretty standard configuration. The only thing I've set
>> was checkpointing (using the default MemoryStateBackend):
>>
>> image.png
>> 4. This is the interesting bit. When I try to create a small
>> reproduction outside the codebase, using a simple source the issue
>> does not reproduce, both with default Kryo serialization and with my
>> own Kryo serializer.
>> 5. No, here is the relevant bit of build.sbt (flinkVersion is set to
>> 1.12)
>> image.png
>> 6. I am trying to come up with a reproduction, thus far with no luck.
>> Here's what I have so far:
>> https://github.com/YuvalItzchakov/flink-bug-repro 
>> <https://github.com/YuvalItzchakov/flink-bug-repro>. I am afraid that
>> there are many more moving parts that are affecting this issue (I have
>> a custom flink source and sink involved)
>>
>> Timo:
>>
>> I am explicitly passing a serialized string of my custom Kryo
>> serializer to the UDF (see
>> https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
>> <https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31>).
>> I can validate that both serialization and deserialization invoke the
>> method defined on my custom serializer, if that's what you mean.
>> Otherwise, if there's a mismatch between the two serializers Flink
>> blows up at runtime saying that the types don't match.
>>
>> On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <[hidden email]
>> <mailto:[hidden email]>> wrote:
>>
>>     Hi Yuval,
>>
>>     could you share a reproducible example with us?
>>
>>     I see you are using SQL / Table API with a RAW type. I could imagine
>>     that the KryoSerializer is configured differently when serializing
>> and
>>     when deserializing. This might be due to `ExecutionConfig` not
>> shipped
>>     (or copied) through the stack correctly.
>>
>>     Even though an error in the stack should be visible immediately
>> and not
>>     after 30 seconds, I still would also investigate an error in this
>>     direction.
>>
>>     Regards,
>>     Timo
>>
>>
>>     On 13.01.21 09:47, Piotr Nowojski wrote:
>>      > Hi Yuval,
>>      >
>>      > I have a couple of questions:
>>      >
>>      > 1. I see that you are using Flink 1.12.0, is that correct?
>>      > 2. Have you tried running your application with a different Flink
>>      > version? If you are using 1.12.0, could you check Flink 1.11.3,
>>     or vice
>>      > versa?
>>      > 3. What's the configuration that you are using? For example, have
>>     you
>>      > enabled unaligned checkpoints or some other feature?
>>      > 4. Is the problem still there if you replace Kryo with something
>>     else
>>      > (Java's serialisation?)?
>>      > 5. Could it be a problem with dependency convergence? Like maybe
>>     there
>>      > are different versions of Flink jars present during runtime?
>>      > 6. Lastly, would it be possible for you to prepare a minimal
>> example
>>      > that could reproduce the problem?
>>      >
>>      > Piotrek
>>      >
>>      > wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]
>>     <mailto:[hidden email]>
>>      > <mailto:[hidden email] <mailto:[hidden email]>>> napisał(a):
>>      >
>>      >     Hi Chesnay,
>>      >     Turns out it didn't actually work, there were one or two
>>      >     successful runs but the problem still persists (it's a bit non
>>      >     deterministic, and doesn't always reproduce when parallelism
>>     is set
>>      >     to 1).
>>      >
>>      >     I turned off all Kryo custom serialization and am only using
>>     Flink
>>      >     provided one's ATM, the problem still persists.
>>      >     There seems to be an issue with how Flink serializes these
>>     raw types
>>      >     over the wire, but I still can't put my finger as to what the
>>      >     problem is.
>>      >
>>      >     What I can see is that Flink tries to consume a
>>     HybridMemorySegment
>>      >     which contains one of these custom raw types I have and
>>     because of
>>      >     malformed content it receives a negative length for the byte
>>     array:
>>      >
>>      >     image.png
>>      >
>>      >     Content seems to be prepended with a bunch of NULL values
>> which
>>      >     through off the length calculation:
>>      >
>>      >     image.png
>>      >
>>      >     But I still don't have the entire chain of execution wrapped
>>      >     mentally in my head, trying to figure it out.
>>      >
>>      >     An additional error I'm receiving, even when removing the
>>      >     problematic JSON field and switching it out for a String:
>>      >
>>      >     java.lang.IllegalStateException: When there are multiple
>>     buffers, an
>>      >     unfinished bufferConsumer can not be at the head of the
>>     buffers queue.
>>      >     at
>>      >      
>>  org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>>      >     ~[flink-core-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      > org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >      
>>  org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>>
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >     org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >     org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >     org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >      
>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>>      >      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>
>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>      >     at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>      >     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>>      >
>>      >     On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler
>>     <[hidden email] <mailto:[hidden email]>
>>      >     <mailto:[hidden email] <mailto:[hidden email]>>>
>> wrote:
>>      >
>>      >         I would think that the likely explanation is some bug
>> in the
>>      >         formatting code of the library you are using.
>>      >         Just for fun you could try manually removing all spaces
>>     within
>>      >         write and see how that turns out. (let's ignore for now
>> that
>>      >         this might also affect keys&values).
>>      >
>>      >         On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>>      >>         OK, this turned out to actually be a problem with the
>> Kryo
>>      >>         serialization. For some reason, it does not like that I
>>     try to
>>      >>         generate a JSON with no spaces, only when I use 2 spaces
>>     will
>>      >>         it work properly.
>>      >>         I am at loss of words.
>>      >>
>>      >>         Just to emphasize the difference:
>>      >>
>>      >>         No Spaces:
>>      >>
>>      >>         image.png
>>      >>         Doesn't work.
>>      >>
>>      >>         With spaces:
>>      >>
>>      >>         image.png
>>      >>
>>      >>         works fine.
>>      >>
>>      >>
>>      >>
>>      >>         On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>>      >>         <[hidden email] <mailto:[hidden email]>
>>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>      >>
>>      >>             Further debugging this issue, this currently seems
>>      >>             unrelated to Kryo at all.
>>      >>
>>      >>             I have a stage that emits a case class down the
>>     stream. I
>>      >>             can see the serialization part works fine, but
>> when the
>>      >>             receiving side is attempting to deserialize the the
>>     case class
>>      >>             it receives a NonSpanningWrapper that has already
>>      >>             surpassed it's buffer limit:
>>      >>
>>      >>             image.png
>>      >>
>>      >>             Any help would be greatly appreciated.
>>      >>
>>      >>             On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>>      >>             <[hidden email] <mailto:[hidden email]>
>>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>      >>
>>      >>                 Hi,
>>      >>
>>      >>                 I've implemented a KryoSerializer for a specific
>>     JSON
>>      >>                 type in my application as I have a bunch of UDFs
>>      >>                 that depend on a RAW('io.circe.Json') encoder
>> being
>>      >>                 available. The implementation is rather simple.
>>     When I
>>      >>                 run my Flink application with Kryo in trace
>> logs, I
>>      >>                 see that data gets properly serialized /
>>     deserialized
>>      >>                 using the serializer. However, after about 30
>>     seconds,
>>      >>                 the application blows up with the following
>> error:
>>      >>
>>      >>                 Caused by: java.io.IOException: Serializer
>> consumed
>>      >>                 more bytes than the record had. This indicates
>>     broken
>>      >>                 serialization. If you are using custom
>> serialization
>>      >>                 types (Value or Writable), check their
>> serialization
>>      >>                 methods. If you are using a Kryo-serialized type,
>>      >>                 check the corresponding Kryo serializer.
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>
>>      >>                 at
>>      >>                 org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>
>>      >>                 at
>>      >>                 org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>      >>                 at java.lang.Thread.run(Thread.java:748)
>>      >>                 Caused by:
>> java.lang.IndexOutOfBoundsException: pos:
>>      >>                 140513145180741, length: 733793654, index: 69,
>>     offset: 0
>>      >>                 at
>>      >>      
>>  org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>
>>      >>                 ... 11 more
>>      >>
>>      >>                 Or with the following exception:
>>      >>
>>      >>                 Caused by: java.lang.NegativeArraySizeException
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>
>>      >>                 at
>>      >> org.apache.flink.runtime.io
>>    
>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>
>>      >>                 at
>>      >>                 org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>
>>      >>                 at
>>      >>                 org.apache.flink.streaming.runtime.io
>>    
>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>      >>                 at
>>      >>      
>>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>      >>                 at java.lang.Thread.run(Thread.java:748)
>>      >>
>>      >>                 I have, however, checked that the serialization
>>     works
>>      >>                 properly and there is no issue there.
>>      >>                 I have the following registration during
>> bootstrap:
>>      >>
>>      >>                 flink.registerType(classOf[Json])
>>      >>                 flink.addDefaultKryoSerializer(classOf[Json],
>>      >>                 classOf[JsonKryoSerializer])
>>      >>
>>      >>                 And the following is the implementation of the
>>      >>                 serializer:
>>      >>
>>      >>                 import com.esotericsoftware.kryo.io.{ Input,
>>     Output }
>>      >>                 import com.esotericsoftware.kryo.{ Kryo,
>>     Serializer }
>>      >>                 import io.circe.Json
>>      >>                 import io.circe.jawn.JawnParser
>>      >>
>>      >>                 final class JsonKryoSerializer extends
>>      >>                 Serializer[Json](true, false) with Serializable {
>>      >>                   private val jawnParser = new JawnParser()
>>      >>
>>      >>                   override def write(kryo: Kryo, output: Output,
>>      >>                 `object`: Json): Unit =
>>      >>                     output.writeString(`object`.noSpaces)
>>      >>
>>      >>                   override def read(kryo: Kryo, input: Input,
>>     `type`:
>>      >>                 Class[Json]): Json = {
>>      >>                     val str = input.readString()
>>      >>
>>      >>                     if (str == null) Json.Null
>>      >>                     else
>>      >>                       jawnParser.parse(str) match {
>>      >>                         case Left(err)    => throw err
>>      >>                         case Right(value) => value
>>      >>                       }
>>      >>                   }
>>      >>
>>      >>                   override def copy(kryo: Kryo, original: Json):
>>     Json =
>>      >>                     jawnParser.parse(original.noSpaces) match {
>>      >>                       case Left(err)    => throw err
>>      >>                       case Right(value) => value
>>      >>                     }
>>      >>                 }
>>      >>
>>      >>                 Would appreciate any help on how to debug this
>>     further.
>>      >>
>>      >>                 --
>>      >>                 Best Regards,
>>      >>                 Yuval Itzchakov.
>>      >>
>>      >>
>>      >>
>>      >>             --
>>      >>             Best Regards,
>>      >>             Yuval Itzchakov.
>>      >>
>>      >>
>>      >>
>>      >>         --
>>      >>         Best Regards,
>>      >>         Yuval Itzchakov.
>>      >
>>      >
>>      >
>>      >
>>      >     --
>>      >     Best Regards,
>>      >     Yuval Itzchakov.
>>      >
>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.12 Kryo Serialization Error

Timo Walther
Forgot to add the link:

https://github.com/twalthr/flink/tree/kryoBug_ser

Regards,
Timo


On 18.01.21 14:11, Timo Walther wrote:

> I ported the code to the Flink code base. Because I had issues with SBT
> and Scala 2.12. Note it uses an older version of circe. I'm just pasting
> it here in case it helps someone.
>
> Regards,
> Timo
>
> On 18.01.21 13:51, Timo Walther wrote:
>> Hi Yuval,
>>
>> thanks for sharing some code with us. I scanned the code but could not
>> find anything suspicious from an API perspective. By using the full
>> RAW serializable string, we should actually be on the save side when
>> it comes to configure the Kryo serializer.
>>
>> I would suggest to further investigate in the checkpointing area if it
>> only occurs when checkpointing is enabled.
>>
>> Regards,
>> Timo
>>
>> On 13.01.21 13:35, Yuval Itzchakov wrote:
>>> Hi Timo and Piotr,
>>>
>>> Let me try and answer all your questions:
>>>
>>> Piotr:
>>>
>>> 1. Yes, I am using Flink 1.12.0
>>> 2. I have no tried downgrading to Flink 1.11.3, as I have features
>>> that are specific to 1.12 that I need (namely the ability to create
>>> a DataStreamScanProvider which was not available in previous versions)
>>> 3. I am using a pretty standard configuration. The only thing I've
>>> set was checkpointing (using the default MemoryStateBackend):
>>>
>>> image.png
>>> 4. This is the interesting bit. When I try to create a small
>>> reproduction outside the codebase, using a simple source the issue
>>> does not reproduce, both with default Kryo serialization and with my
>>> own Kryo serializer.
>>> 5. No, here is the relevant bit of build.sbt (flinkVersion is set to
>>> 1.12)
>>> image.png
>>> 6. I am trying to come up with a reproduction, thus far with no luck.
>>> Here's what I have so far:
>>> https://github.com/YuvalItzchakov/flink-bug-repro 
>>> <https://github.com/YuvalItzchakov/flink-bug-repro>. I am afraid that
>>> there are many more moving parts that are affecting this issue (I
>>> have a custom flink source and sink involved)
>>>
>>> Timo:
>>>
>>> I am explicitly passing a serialized string of my custom Kryo
>>> serializer to the UDF (see
>>> https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31 
>>> <https://github.com/YuvalItzchakov/flink-bug-repro/blob/master/src/main/scala/org/yuvalitzchakov/bugrepro/BugRepro.scala#L31>).
>>> I can validate that both serialization and deserialization invoke the
>>> method defined on my custom serializer, if that's what you mean.
>>> Otherwise, if there's a mismatch between the two serializers Flink
>>> blows up at runtime saying that the types don't match.
>>>
>>> On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <[hidden email]
>>> <mailto:[hidden email]>> wrote:
>>>
>>>     Hi Yuval,
>>>
>>>     could you share a reproducible example with us?
>>>
>>>     I see you are using SQL / Table API with a RAW type. I could imagine
>>>     that the KryoSerializer is configured differently when
>>> serializing and
>>>     when deserializing. This might be due to `ExecutionConfig` not
>>> shipped
>>>     (or copied) through the stack correctly.
>>>
>>>     Even though an error in the stack should be visible immediately
>>> and not
>>>     after 30 seconds, I still would also investigate an error in this
>>>     direction.
>>>
>>>     Regards,
>>>     Timo
>>>
>>>
>>>     On 13.01.21 09:47, Piotr Nowojski wrote:
>>>      > Hi Yuval,
>>>      >
>>>      > I have a couple of questions:
>>>      >
>>>      > 1. I see that you are using Flink 1.12.0, is that correct?
>>>      > 2. Have you tried running your application with a different Flink
>>>      > version? If you are using 1.12.0, could you check Flink 1.11.3,
>>>     or vice
>>>      > versa?
>>>      > 3. What's the configuration that you are using? For example, have
>>>     you
>>>      > enabled unaligned checkpoints or some other feature?
>>>      > 4. Is the problem still there if you replace Kryo with something
>>>     else
>>>      > (Java's serialisation?)?
>>>      > 5. Could it be a problem with dependency convergence? Like maybe
>>>     there
>>>      > are different versions of Flink jars present during runtime?
>>>      > 6. Lastly, would it be possible for you to prepare a minimal
>>> example
>>>      > that could reproduce the problem?
>>>      >
>>>      > Piotrek
>>>      >
>>>      > wt., 12 sty 2021 o 17:19 Yuval Itzchakov <[hidden email]
>>>     <mailto:[hidden email]>
>>>      > <mailto:[hidden email] <mailto:[hidden email]>>>
>>> napisał(a):
>>>      >
>>>      >     Hi Chesnay,
>>>      >     Turns out it didn't actually work, there were one or two
>>>      >     successful runs but the problem still persists (it's a bit
>>> non
>>>      >     deterministic, and doesn't always reproduce when parallelism
>>>     is set
>>>      >     to 1).
>>>      >
>>>      >     I turned off all Kryo custom serialization and am only using
>>>     Flink
>>>      >     provided one's ATM, the problem still persists.
>>>      >     There seems to be an issue with how Flink serializes these
>>>     raw types
>>>      >     over the wire, but I still can't put my finger as to what the
>>>      >     problem is.
>>>      >
>>>      >     What I can see is that Flink tries to consume a
>>>     HybridMemorySegment
>>>      >     which contains one of these custom raw types I have and
>>>     because of
>>>      >     malformed content it receives a negative length for the byte
>>>     array:
>>>      >
>>>      >     image.png
>>>      >
>>>      >     Content seems to be prepended with a bunch of NULL values
>>> which
>>>      >     through off the length calculation:
>>>      >
>>>      >     image.png
>>>      >
>>>      >     But I still don't have the entire chain of execution wrapped
>>>      >     mentally in my head, trying to figure it out.
>>>      >
>>>      >     An additional error I'm receiving, even when removing the
>>>      >     problematic JSON field and switching it out for a String:
>>>      >
>>>      >     java.lang.IllegalStateException: When there are multiple
>>>     buffers, an
>>>      >     unfinished bufferConsumer can not be at the head of the
>>>     buffers queue.
>>>      >     at
>>>      >
>>>  org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>>>      >     ~[flink-core-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:277)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:214)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      > org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >
>>>  org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
>>>
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >     org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >     org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >     org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >
>>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>>      >
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>>
>>>      >     ~[flink-streaming-java_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>      >     ~[flink-runtime_2.12-1.12.0.jar:1.12.0]
>>>      >     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_275]
>>>      >
>>>      >     On Tue, Jan 12, 2021 at 4:14 PM Chesnay Schepler
>>>     <[hidden email] <mailto:[hidden email]>
>>>      >     <mailto:[hidden email] <mailto:[hidden email]>>>
>>> wrote:
>>>      >
>>>      >         I would think that the likely explanation is some bug
>>> in the
>>>      >         formatting code of the library you are using.
>>>      >         Just for fun you could try manually removing all spaces
>>>     within
>>>      >         write and see how that turns out. (let's ignore for
>>> now that
>>>      >         this might also affect keys&values).
>>>      >
>>>      >         On 1/12/2021 2:38 PM, Yuval Itzchakov wrote:
>>>      >>         OK, this turned out to actually be a problem with the
>>> Kryo
>>>      >>         serialization. For some reason, it does not like that I
>>>     try to
>>>      >>         generate a JSON with no spaces, only when I use 2 spaces
>>>     will
>>>      >>         it work properly.
>>>      >>         I am at loss of words.
>>>      >>
>>>      >>         Just to emphasize the difference:
>>>      >>
>>>      >>         No Spaces:
>>>      >>
>>>      >>         image.png
>>>      >>         Doesn't work.
>>>      >>
>>>      >>         With spaces:
>>>      >>
>>>      >>         image.png
>>>      >>
>>>      >>         works fine.
>>>      >>
>>>      >>
>>>      >>
>>>      >>         On Mon, Jan 11, 2021 at 3:01 PM Yuval Itzchakov
>>>      >>         <[hidden email] <mailto:[hidden email]>
>>>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>>      >>
>>>      >>             Further debugging this issue, this currently seems
>>>      >>             unrelated to Kryo at all.
>>>      >>
>>>      >>             I have a stage that emits a case class down the
>>>     stream. I
>>>      >>             can see the serialization part works fine, but
>>> when the
>>>      >>             receiving side is attempting to deserialize the the
>>>     case class
>>>      >>             it receives a NonSpanningWrapper that has already
>>>      >>             surpassed it's buffer limit:
>>>      >>
>>>      >>             image.png
>>>      >>
>>>      >>             Any help would be greatly appreciated.
>>>      >>
>>>      >>             On Mon, Jan 11, 2021 at 1:49 PM Yuval Itzchakov
>>>      >>             <[hidden email] <mailto:[hidden email]>
>>>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>>>      >>
>>>      >>                 Hi,
>>>      >>
>>>      >>                 I've implemented a KryoSerializer for a specific
>>>     JSON
>>>      >>                 type in my application as I have a bunch of UDFs
>>>      >>                 that depend on a RAW('io.circe.Json') encoder
>>> being
>>>      >>                 available. The implementation is rather simple.
>>>     When I
>>>      >>                 run my Flink application with Kryo in trace
>>> logs, I
>>>      >>                 see that data gets properly serialized /
>>>     deserialized
>>>      >>                 using the serializer. However, after about 30
>>>     seconds,
>>>      >>                 the application blows up with the following
>>> error:
>>>      >>
>>>      >>                 Caused by: java.io.IOException: Serializer
>>> consumed
>>>      >>                 more bytes than the record had. This indicates
>>>     broken
>>>      >>                 serialization. If you are using custom
>>> serialization
>>>      >>                 types (Value or Writable), check their
>>> serialization
>>>      >>                 methods. If you are using a Kryo-serialized
>>> type,
>>>      >>                 check the corresponding Kryo serializer.
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>>
>>>      >>                 at
>>>      >>                 org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>>
>>>      >>                 at
>>>      >>                 org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>>
>>>      >>                 at
>>>      >>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>      >>                 at
>>>      >>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>      >>                 at java.lang.Thread.run(Thread.java:748)
>>>      >>                 Caused by:
>>> java.lang.IndexOutOfBoundsException: pos:
>>>      >>                 140513145180741, length: 733793654, index: 69,
>>>     offset: 0
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:198)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:103)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>>
>>>      >>                 ... 11 more
>>>      >>
>>>      >>                 Or with the following exception:
>>>      >>
>>>      >>                 Caused by: java.lang.NegativeArraySizeException
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:102)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:106)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:50)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:335)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:108)
>>>
>>>      >>                 at
>>>      >> org.apache.flink.runtime.io
>>> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85)
>>>
>>>      >>                 at
>>>      >>                 org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
>>>
>>>      >>                 at
>>>      >>                 org.apache.flink.streaming.runtime.io
>>> <http://runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
>>>
>>>      >>                 at
>>>      >>
>>>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
>>>
>>>      >>                 at
>>>      >>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>      >>                 at
>>>      >>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>      >>                 at java.lang.Thread.run(Thread.java:748)
>>>      >>
>>>      >>                 I have, however, checked that the serialization
>>>     works
>>>      >>                 properly and there is no issue there.
>>>      >>                 I have the following registration during
>>> bootstrap:
>>>      >>
>>>      >>                 flink.registerType(classOf[Json])
>>>      >>                 flink.addDefaultKryoSerializer(classOf[Json],
>>>      >>                 classOf[JsonKryoSerializer])
>>>      >>
>>>      >>                 And the following is the implementation of the
>>>      >>                 serializer:
>>>      >>
>>>      >>                 import com.esotericsoftware.kryo.io.{ Input,
>>>     Output }
>>>      >>                 import com.esotericsoftware.kryo.{ Kryo,
>>>     Serializer }
>>>      >>                 import io.circe.Json
>>>      >>                 import io.circe.jawn.JawnParser
>>>      >>
>>>      >>                 final class JsonKryoSerializer extends
>>>      >>                 Serializer[Json](true, false) with
>>> Serializable {
>>>      >>                   private val jawnParser = new JawnParser()
>>>      >>
>>>      >>                   override def write(kryo: Kryo, output: Output,
>>>      >>                 `object`: Json): Unit =
>>>      >>                     output.writeString(`object`.noSpaces)
>>>      >>
>>>      >>                   override def read(kryo: Kryo, input: Input,
>>>     `type`:
>>>      >>                 Class[Json]): Json = {
>>>      >>                     val str = input.readString()
>>>      >>
>>>      >>                     if (str == null) Json.Null
>>>      >>                     else
>>>      >>                       jawnParser.parse(str) match {
>>>      >>                         case Left(err)    => throw err
>>>      >>                         case Right(value) => value
>>>      >>                       }
>>>      >>                   }
>>>      >>
>>>      >>                   override def copy(kryo: Kryo, original: Json):
>>>     Json =
>>>      >>                     jawnParser.parse(original.noSpaces) match {
>>>      >>                       case Left(err)    => throw err
>>>      >>                       case Right(value) => value
>>>      >>                     }
>>>      >>                 }
>>>      >>
>>>      >>                 Would appreciate any help on how to debug this
>>>     further.
>>>      >>
>>>      >>                 --
>>>      >>                 Best Regards,
>>>      >>                 Yuval Itzchakov.
>>>      >>
>>>      >>
>>>      >>
>>>      >>             --
>>>      >>             Best Regards,
>>>      >>             Yuval Itzchakov.
>>>      >>
>>>      >>
>>>      >>
>>>      >>         --
>>>      >>         Best Regards,
>>>      >>         Yuval Itzchakov.
>>>      >
>>>      >
>>>      >
>>>      >
>>>      >     --
>>>      >     Best Regards,
>>>      >     Yuval Itzchakov.
>>>      >
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>
>