"No more bytes left" at deserialization

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

"No more bytes left" at deserialization

Timur Fayruzov
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

The simplified version of the code looks more or less like following:
```
case class Name(first: String, last: String)
case class Phone(number: String)
case class Address(addr: String, city: String, country: String)
case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
...
def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ...
...
val data = env.readCsvFile[MySchema](...).map(Record(_))

val helper: DataSet[(Name, String)] = ...

val result = data.filter(_.address.isDefined)
  .coGroup(helper)
  .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country)))
  .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
  .apply {resolutionFunc}
  .filter(_ != "")

result.writeAsText(...)
```

This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in.

I'm trying now to explicitly set Protobuf serializer for Kryo:
```
env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer])

```
but every run takes significant time before failing, so any other advice is appreciated.

Thanks,
Timur
Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
Trying to use ProtobufSerializer -- program consistently fails with the following exception:

java.lang.IllegalStateException: Update task on instance 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] after [10000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

The simplified version of the code looks more or less like following:
```
case class Name(first: String, last: String)
case class Phone(number: String)
case class Address(addr: String, city: String, country: String)
case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
...
def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ...
...
val data = env.readCsvFile[MySchema](...).map(Record(_))

val helper: DataSet[(Name, String)] = ...

val result = data.filter(_.address.isDefined)
  .coGroup(helper)
  .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country)))
  .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
  .apply {resolutionFunc}
  .filter(_ != "")

result.writeAsText(...)
```

This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in.

I'm trying now to explicitly set Protobuf serializer for Kryo:
```
env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer])

```
but every run takes significant time before failing, so any other advice is appreciated.

Thanks,
Timur

Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

rmetzger0
For the second exception, can you check the logs of the failing taskmanager (10.105.200.137)?
I guess these logs some details on why the TM timed out.


Are you on 1.0.x or on 1.1-SNAPSHOT?
We recently changed something related to the ExecutionConfig which has lead to Kryo issues in the past.


On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <[hidden email]> wrote:
Trying to use ProtobufSerializer -- program consistently fails with the following exception:

java.lang.IllegalStateException: Update task on instance 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] after [10000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

The simplified version of the code looks more or less like following:
```
case class Name(first: String, last: String)
case class Phone(number: String)
case class Address(addr: String, city: String, country: String)
case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
...
def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ...
...
val data = env.readCsvFile[MySchema](...).map(Record(_))

val helper: DataSet[(Name, String)] = ...

val result = data.filter(_.address.isDefined)
  .coGroup(helper)
  .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country)))
  .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
  .apply {resolutionFunc}
  .filter(_ != "")

result.writeAsText(...)
```

This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in.

I'm trying now to explicitly set Protobuf serializer for Kryo:
```
env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer])

```
but every run takes significant time before failing, so any other advice is appreciated.

Thanks,
Timur


Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
Hello Robert,

I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue with a cluster (that I didn't dig into), when I restarted the cluster I was able to go past it, so now I have the following exception:

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
at org.apache.flink.types.StringValue.readString(StringValue.java:771)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
... 5 more

Thanks,
Timur

On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]> wrote:
For the second exception, can you check the logs of the failing taskmanager (10.105.200.137)?
I guess these logs some details on why the TM timed out.


Are you on 1.0.x or on 1.1-SNAPSHOT?
We recently changed something related to the ExecutionConfig which has lead to Kryo issues in the past.


On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <[hidden email]> wrote:
Trying to use ProtobufSerializer -- program consistently fails with the following exception:

java.lang.IllegalStateException: Update task on instance 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] after [10000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

The simplified version of the code looks more or less like following:
```
case class Name(first: String, last: String)
case class Phone(number: String)
case class Address(addr: String, city: String, country: String)
case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
...
def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ...
...
val data = env.readCsvFile[MySchema](...).map(Record(_))

val helper: DataSet[(Name, String)] = ...

val result = data.filter(_.address.isDefined)
  .coGroup(helper)
  .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country)))
  .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
  .apply {resolutionFunc}
  .filter(_ != "")

result.writeAsText(...)
```

This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in.

I'm trying now to explicitly set Protobuf serializer for Kryo:
```
env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer])

```
but every run takes significant time before failing, so any other advice is appreciated.

Thanks,
Timur



Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
Still trying to resolve this serialization issue. I was able to hack it by 'serializing' `Record` to String and then 'deserializing' it in coGroup, but boy its so ugly.

So the bug is that it can't deserialize the case class that has the structure (slightly different and more detailed than I stated above):
```
case class Record(name: Name, phone: Option[Phone], address: Option[Address])

case class Name(givenName: Option[String], middleName: Option[String], familyName: Option[String], generationSuffix: Option[String] = None)

trait Address{
  val city: String
  val state: String
  val country: String
  val latitude: Double
  val longitude: Double
  val postalCode: String
  val zip4: String
  val digest: String
}


case class PoBox(city: String,
                 state: String,
                 country: String,
                 latitude: Double,
                 longitude: Double,
                 postalCode: String,
                 zip4: String,
                 digest: String,
                 poBox: String
                ) extends Address

case class PostalAddress(city: String,
                         state: String,
                         country: String,
                         latitude: Double,
                         longitude: Double,
                         postalCode: String,
                         zip4: String,
                         digest: String,
                         preDir: String,
                         streetName: String,
                         streetType: String,
                         postDir: String,
                         house: String,
                         aptType: String,
                         aptNumber: String
                        ) extends Address
```

I would expect that serialization is one of Flink cornerstones and should be well tested, so there is a high chance of me doing things wrongly, but I can't really find anything unusual in my code.

Any suggestion what to try is highly welcomed.

Thanks,
Timur


On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]> wrote:
Hello Robert,

I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue with a cluster (that I didn't dig into), when I restarted the cluster I was able to go past it, so now I have the following exception:

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
at org.apache.flink.types.StringValue.readString(StringValue.java:771)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
... 5 more

Thanks,
Timur

On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]> wrote:
For the second exception, can you check the logs of the failing taskmanager (10.105.200.137)?
I guess these logs some details on why the TM timed out.


Are you on 1.0.x or on 1.1-SNAPSHOT?
We recently changed something related to the ExecutionConfig which has lead to Kryo issues in the past.


On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <[hidden email]> wrote:
Trying to use ProtobufSerializer -- program consistently fails with the following exception:

java.lang.IllegalStateException: Update task on instance 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] after [10000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <[hidden email]> wrote:
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend                           - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

The simplified version of the code looks more or less like following:
```
case class Name(first: String, last: String)
case class Phone(number: String)
case class Address(addr: String, city: String, country: String)
case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
...
def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ...
...
val data = env.readCsvFile[MySchema](...).map(Record(_))

val helper: DataSet[(Name, String)] = ...

val result = data.filter(_.address.isDefined)
  .coGroup(helper)
  .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country)))
  .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
  .apply {resolutionFunc}
  .filter(_ != "")

result.writeAsText(...)
```

This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in.

I'm trying now to explicitly set Protobuf serializer for Kryo:
```
env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer])

```
but every run takes significant time before failing, so any other advice is appreciated.

Thanks,
Timur




Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Ufuk Celebi
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:

> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Aljoscha Krettek
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Till Rohrmann

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Till Rohrmann
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <[hidden email]> wrote:
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>



Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <[hidden email]> wrote:
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <[hidden email]> wrote:
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>




Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Ken Krugler
I don’t know if this is helpful, but I’d run into a similar issue (array index out of bounds during Kryo deserialization) due to having a different version of Kryo on the classpath.

— Ken

On Apr 26, 2016, at 6:23pm, Timur Fayruzov <[hidden email]> wrote:

I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <[hidden email]> wrote:
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <[hidden email]> wrote:
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>





--------------------------
Ken Krugler
+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Timur Fayruzov
Hi Ken,

Good point actually, thanks for pointing this out. In Flink project I see that there is dependency on 2.24 and then I see transitive dependencies through twitter.carbonite has a dependency on 2.21. Also, twitter.chill that is used to manipulate Kryo as far as I understand, shows up with versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could cause issues since if classes are not deduplicated class loading order could be different on different machines. The maven project setup is fairly complicated and I'm not a maven expert, so I would appreciate a second look on that.

Thanks,
Timur


On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <[hidden email]> wrote:
I don’t know if this is helpful, but I’d run into a similar issue (array index out of bounds during Kryo deserialization) due to having a different version of Kryo on the classpath.

— Ken

On Apr 26, 2016, at 6:23pm, Timur Fayruzov <[hidden email]> wrote:

I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <[hidden email]> wrote:
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <[hidden email]> wrote:
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>





--------------------------
Ken Krugler
<a href="tel:%2B1%20530-210-6378" value="+15302106378" target="_blank">+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Reply | Threaded
Open this post in threaded view
|

Re: "No more bytes left" at deserialization

Till Rohrmann

Hi Timur,

could you try to exclude the older kryo dependency from twitter.carbonite via

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>carbonite</artifactId>
    <version>1.4.0</version>
    <exclusions>
        <exclusion>
            <artifactId>kryo</artifactId>
            <groupId>com.esotericsoftware.kryo</groupId>
        </exclusion>
    </exclusions>
</dependency>

and try whether this solves your problem. If your problem should still persist, could you share your pom file with us.

Cheers,
Till


On Wed, Apr 27, 2016 at 8:34 AM, Timur Fayruzov <[hidden email]> wrote:
Hi Ken,

Good point actually, thanks for pointing this out. In Flink project I see that there is dependency on 2.24 and then I see transitive dependencies through twitter.carbonite has a dependency on 2.21. Also, twitter.chill that is used to manipulate Kryo as far as I understand, shows up with versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could cause issues since if classes are not deduplicated class loading order could be different on different machines. The maven project setup is fairly complicated and I'm not a maven expert, so I would appreciate a second look on that.

Thanks,
Timur


On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <[hidden email]> wrote:
I don’t know if this is helpful, but I’d run into a similar issue (array index out of bounds during Kryo deserialization) due to having a different version of Kryo on the classpath.

— Ken

On Apr 26, 2016, at 6:23pm, Timur Fayruzov <[hidden email]> wrote:

I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <[hidden email]> wrote:
Then let's keep finger crossed that we've found the culprit :-)

On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <[hidden email]> wrote:
Thank you Till.

I will try to run with new binaries today. As I have mentioned, the error is reproducible only on a full dataset, so coming up with sample input data may be problematic (not to mention that the real data can't be shared). I'll see if I can replicate it, but could take a bit longer. Thank you very much for your effort.

On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <[hidden email]> wrote:

Hi Timur,

I’ve got good and not so good news. Let’s start with the not so good news. I couldn’t reproduce your problem but the good news is that I found a bug in the duplication logic of the OptionSerializer. I’ve already committed a patch to the master to fix it.

Thus, I wanted to ask you, whether you could try out the latest master and check whether your problem still persists. If that’s the case, could you send me your complete code with sample input data which reproduces your problem?

Cheers,
Till


On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <[hidden email]> wrote:
Could this be caused by the disabled reference tracking in our Kryo serializer? From the stack trace it looks like its failing when trying to deserialize the traits that are wrapped in Options.

On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <[hidden email]> wrote:
Hey Timur,

I'm sorry about this bad experience.

From what I can tell, there is nothing unusual with your code. It's
probably an issue with Flink.

I think we have to wait a little longer to hear what others in the
community say about this.

@Aljoscha, Till, Robert: any ideas what might cause this?

– Ufuk


On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
<[hidden email]> wrote:
> Still trying to resolve this serialization issue. I was able to hack it by
> 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> boy its so ugly.
>
> So the bug is that it can't deserialize the case class that has the
> structure (slightly different and more detailed than I stated above):
> ```
> case class Record(name: Name, phone: Option[Phone], address:
> Option[Address])
>
> case class Name(givenName: Option[String], middleName: Option[String],
> familyName: Option[String], generationSuffix: Option[String] = None)
>
> trait Address{
>   val city: String
>   val state: String
>   val country: String
>   val latitude: Double
>   val longitude: Double
>   val postalCode: String
>   val zip4: String
>   val digest: String
> }
>
>
> case class PoBox(city: String,
>                  state: String,
>                  country: String,
>                  latitude: Double,
>                  longitude: Double,
>                  postalCode: String,
>                  zip4: String,
>                  digest: String,
>                  poBox: String
>                 ) extends Address
>
> case class PostalAddress(city: String,
>                          state: String,
>                          country: String,
>                          latitude: Double,
>                          longitude: Double,
>                          postalCode: String,
>                          zip4: String,
>                          digest: String,
>                          preDir: String,
>                          streetName: String,
>                          streetType: String,
>                          postDir: String,
>                          house: String,
>                          aptType: String,
>                          aptNumber: String
>                         ) extends Address
> ```
>
> I would expect that serialization is one of Flink cornerstones and should be
> well tested, so there is a high chance of me doing things wrongly, but I
> can't really find anything unusual in my code.
>
> Any suggestion what to try is highly welcomed.
>
> Thanks,
> Timur
>
>
> On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[hidden email]>
> wrote:
>>
>> Hello Robert,
>>
>> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
>> with a cluster (that I didn't dig into), when I restarted the cluster I was
>> able to go past it, so now I have the following exception:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
>> at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Serializer consumed more
>> bytes than the record had. This indicates broken serialization. If you are
>> using custom serialization types (Value or Writable), check their
>> serialization methods. If you are using a Kryo-serialized type, check the
>> corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at
>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>> ... 5 more
>>
>> Thanks,
>> Timur
>>
>> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[hidden email]>
>> wrote:
>>>
>>> For the second exception, can you check the logs of the failing
>>> taskmanager (10.105.200.137)?
>>> I guess these logs some details on why the TM timed out.
>>>
>>>
>>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>> We recently changed something related to the ExecutionConfig which has
>>> lead to Kryo issues in the past.
>>>
>>>
>>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>> <[hidden email]> wrote:
>>>>
>>>> Trying to use ProtobufSerializer -- program consistently fails with the
>>>> following exception:
>>>>
>>>> java.lang.IllegalStateException: Update task on instance
>>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>>>> at
>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at
>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>> at
>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>> after [10000 ms]
>>>> at
>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>> at
>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>> at
>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>> at
>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>>
>>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running a Flink program that is failing with the following
>>>>> exception:
>>>>>
>>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>>>> - Error while running the command.
>>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>>> execution failed: Job execution failed.
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>> at
>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>> at
>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>> at
>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>> at scala.Option.foreach(Option.scala:257)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>> at
>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>> at
>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>> at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
>>>>> CoGroup (CoGroup at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>> Filter (Filter at
>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
>>>>> Reading Thread' terminated due to an exception: No more bytes left.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: No
>>>>> more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>> at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>> at
>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: No more bytes left.
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
>>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>> at
>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>> at
>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>> The simplified version of the code looks more or less like following:
>>>>> ```
>>>>> case class Name(first: String, last: String)
>>>>> case class Phone(number: String)
>>>>> case class Address(addr: String, city: String, country: String)
>>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
>>>>> ...
>>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
>>>>> String = ...
>>>>> ...
>>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>
>>>>> val helper: DataSet[(Name, String)] = ...
>>>>>
>>>>> val result = data.filter(_.address.isDefined)
>>>>>   .coGroup(helper)
>>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>> e.address.get.country)))
>>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>   .apply {resolutionFunc}
>>>>>   .filter(_ != "")
>>>>>
>>>>> result.writeAsText(...)
>>>>> ```
>>>>>
>>>>> This code fails only when I run it on the full dataset, when I split
>>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to
>>>>> complete successfully. I guess with smaller memory requirements
>>>>> serialization/deserialization does not kick in.
>>>>>
>>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>> ```
>>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>> classOf[ProtobufSerializer])
>>>>>
>>>>> ```
>>>>> but every run takes significant time before failing, so any other
>>>>> advice is appreciated.
>>>>>
>>>>> Thanks,
>>>>> Timur
>>>>
>>>>
>>>
>>
>





--------------------------
Ken Krugler
<a href="tel:%2B1%20530-210-6378" value="+15302106378" target="_blank">+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr