Hi, I am using flink-1.0.0 and running ETL (batch) jobs on it for quite some time (few months) without any problem. Starting this morning, I have been getting errors like these-"Received an event in channel 3 while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo, check the respective Kryo serializer." My datasets are in Avro format. The only thing that changed today is - I moved to smaller cluster. When I first ran the ETL jobs, they failed with this error- "Insufficient number of network buffers: required 20, but only 10 available. The total number of network buffers is currently set to 20000. You can increase this number by setting the configuration key 'taskmanager.network.numberOfBuffers'" I increased number of buffers to 30k. Also decreased number of slots per machine as I noticed load per machine was too high. After that, when I restart the jobs, I am getting the above error. Can someone please help me debug it? Thank you, Tarandeep |
Now, when I ran it again (with lower task slots per machine) I got a different error-
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) .... at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: javaec40-d994-yteBuffer at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: javaec40-d994-yteBuffer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 15 more -Tarandeep On Mon, Oct 3, 2016 at 12:49 PM, Tarandeep Singh <[hidden email]> wrote:
|
I think you're running into the same exception I face sometimes..I've opened a jira for it [1]. Could you please try to apply that patch and see if things get better? https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-4719 Best, On 3 Oct 2016 22:09, "Tarandeep Singh" <[hidden email]> wrote:
|
Hi Tarandeep, it would be great if you could compile a small example data set with which you're able to reproduce your problem. We could then try to debug it. It would also be interesting to know whether Flavio's bug solves your problem or not. Cheers, Till On Mon, Oct 3, 2016 at 10:26 PM, Flavio Pompermaier <[hidden email]> wrote:
|
It would be great to know if this only occurs in setups where Netty in involved (more than one TaskManager and, and at least one shuffle/rebalance) or also in one-taskmanager setups (which have local channels only). Stephan On Tue, Oct 4, 2016 at 11:49 AM, Till Rohrmann <[hidden email]> wrote:
|
@Tarandeep and Flavio: +1 to Stephan's question.
Furthermore, I've created a branch which adds a simple CRC32 checksum calculation over the network buffer content here: https://github.com/uce/flink/tree/checksum It would be great if you could run your job with a build from this branch. It's based on the current 1.1. release branch. If you need help building and running from this branch, feel free to ping me. git clone https://github.com/uce/flink.git flink-uce cd flink-uce git checkout -b checksum origin/checksum mvn clean install -DskipTests The build binary distro is found in flink-dist/target/flink-1.1-SNAPSHOT-bin/flink-1.1-SNAPSHOT. Just copy your config files to the conf dir there and start Flink as usual in that directory. If the checksums don't match the job should fail with an Exception. If this happens, it is likely that the problems are caused by data corruption on the network layer. If not, it's more likely that there is something off with the Kryo serializers. (You can also follow this guide depending on your Hadoop requirements: https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#hadoop-versions) On Tue, Oct 4, 2016 at 7:10 PM, Stephan Ewen <[hidden email]> wrote: > It would be great to know if this only occurs in setups where Netty in > involved (more than one TaskManager and, and at least one shuffle/rebalance) > or also in one-taskmanager setups (which have local channels only). > > Stephan > > On Tue, Oct 4, 2016 at 11:49 AM, Till Rohrmann <[hidden email]> wrote: >> >> Hi Tarandeep, >> >> it would be great if you could compile a small example data set with which >> you're able to reproduce your problem. We could then try to debug it. It >> would also be interesting to know whether Flavio's bug solves your problem >> or not. >> >> Cheers, >> Till >> >> On Mon, Oct 3, 2016 at 10:26 PM, Flavio Pompermaier <[hidden email]> >> wrote: >>> >>> I think you're running into the same exception I face sometimes..I've >>> opened a jira for it [1]. Could you please try to apply that patch and see >>> if things get better? >>> >>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-4719 >>> >>> Best, >>> Flavio >>> >>> >>> On 3 Oct 2016 22:09, "Tarandeep Singh" <[hidden email]> wrote: >>>> >>>> Now, when I ran it again (with lower task slots per machine) I got a >>>> different error- >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>> execution failed: Job execution failed. >>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>>> at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>>> at >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>> .... >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>> at >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed. >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find >>>> class: javaec40-d994-yteBuffer >>>> at >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>> at >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>> at >>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>> at >>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>> at >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>> at >>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>> at >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>> at >>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.lang.ClassNotFoundException: javaec40-d994-yteBuffer >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Class.java:348) >>>> at >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>> ... 15 more >>>> >>>> >>>> -Tarandeep >>>> >>>> On Mon, Oct 3, 2016 at 12:49 PM, Tarandeep Singh <[hidden email]> >>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I am using flink-1.0.0 and running ETL (batch) jobs on it for quite >>>>> some time (few months) without any problem. Starting this morning, I have >>>>> been getting errors like these- >>>>> >>>>> "Received an event in channel 3 while still having data from a record. >>>>> This indicates broken serialization logic. If you are using custom >>>>> serialization code (Writable or Value types), check their serialization >>>>> routines. In the case of Kryo, check the respective Kryo serializer." >>>>> >>>>> My datasets are in Avro format. The only thing that changed today is - >>>>> I moved to smaller cluster. When I first ran the ETL jobs, they failed with >>>>> this error- >>>>> >>>>> "Insufficient number of network buffers: required 20, but only 10 >>>>> available. The total number of network buffers is currently set to 20000. >>>>> You can increase this number by setting the configuration key >>>>> 'taskmanager.network.numberOfBuffers'" >>>>> >>>>> I increased number of buffers to 30k. Also decreased number of slots >>>>> per machine as I noticed load per machine was too high. After that, when I >>>>> restart the jobs, I am getting the above error. >>>>> >>>>> Can someone please help me debug it? >>>>> >>>>> Thank you, >>>>> Tarandeep >>>> >>>> >> > |
@Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. Slots per task manager: 2-4 (I tried varying this to see if this has any impact). Network buffers: 5k - 20k (tried different values for it). @Ufuk: Thank you for creating a branch with checksum. I will use this build to test the jobs and post here what I will learn.On Wed, Oct 5, 2016 at 4:53 AM, Ufuk Celebi <[hidden email]> wrote: @Tarandeep and Flavio: +1 to Stephan's question. |
On Wed, Oct 5, 2016 at 7:08 PM, Tarandeep Singh <[hidden email]> wrote:
> @Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. Slots > per task manager: 2-4 (I tried varying this to see if this has any impact). > Network buffers: 5k - 20k (tried different values for it). Could you run the job first on a single task manager to see if the error occurs even if no network shuffle is involved? That should be less overhead for you than running the custom build (which might be buggy ;)). – Ufuk |
The problem is that data is very large and usually cannot run on a single machine :(
On Thu, Oct 6, 2016 at 10:11 AM, Ufuk Celebi <[hidden email]> wrote: On Wed, Oct 5, 2016 at 7:08 PM, Tarandeep Singh <[hidden email]> wrote: |
Yes, if that's the case you should go with option (2) and run with the
checksums I think. On Thu, Oct 6, 2016 at 10:32 AM, Flavio Pompermaier <[hidden email]> wrote: > The problem is that data is very large and usually cannot run on a single > machine :( > > On Thu, Oct 6, 2016 at 10:11 AM, Ufuk Celebi <[hidden email]> wrote: >> >> On Wed, Oct 5, 2016 at 7:08 PM, Tarandeep Singh <[hidden email]> >> wrote: >> > @Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. >> > Slots >> > per task manager: 2-4 (I tried varying this to see if this has any >> > impact). >> > Network buffers: 5k - 20k (tried different values for it). >> >> Could you run the job first on a single task manager to see if the >> error occurs even if no network shuffle is involved? That should be >> less overhead for you than running the custom build (which might be >> buggy ;)). >> >> – Ufuk > > > > |
I've ran the job once more (always using the checksum branch) and this time I got:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1953786112 at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:83) at org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:32) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45) at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130) at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64) at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745) On Thu, Oct 6, 2016 at 11:00 AM, Ufuk Celebi <[hidden email]> wrote: Yes, if that's the case you should go with option (2) and run with the |
I guess that this is caused by a bug in the checksum calculation. Let
me check that. On Thu, Oct 6, 2016 at 1:24 PM, Flavio Pompermaier <[hidden email]> wrote: > I've ran the job once more (always using the checksum branch) and this time > I got: > > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1953786112 > at > org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:83) > at > org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:32) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) > at > org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) > at > org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) > at > org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45) > at > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130) > at > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) > at > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) > at > org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64) > at > org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) > > > On Thu, Oct 6, 2016 at 11:00 AM, Ufuk Celebi <[hidden email]> wrote: >> >> Yes, if that's the case you should go with option (2) and run with the >> checksums I think. >> >> On Thu, Oct 6, 2016 at 10:32 AM, Flavio Pompermaier >> <[hidden email]> wrote: >> > The problem is that data is very large and usually cannot run on a >> > single >> > machine :( >> > >> > On Thu, Oct 6, 2016 at 10:11 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> On Wed, Oct 5, 2016 at 7:08 PM, Tarandeep Singh <[hidden email]> >> >> wrote: >> >> > @Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. >> >> > Slots >> >> > per task manager: 2-4 (I tried varying this to see if this has any >> >> > impact). >> >> > Network buffers: 5k - 20k (tried different values for it). >> >> >> >> Could you run the job first on a single task manager to see if the >> >> error occurs even if no network shuffle is involved? That should be >> >> less overhead for you than running the custom build (which might be >> >> buggy ;)). >> >> >> >> – Ufuk >> > >> > >> > >> > > > > |
Hi Ufuk,
any news on this? On Thu, Oct 6, 2016 at 1:30 PM, Ufuk Celebi <[hidden email]> wrote: I guess that this is caused by a bug in the checksum calculation. Let |
No, sorry. I was waiting for Tarandeep's feedback before looking into
it further. I will do it over the next days in any case. On Wed, Oct 12, 2016 at 10:49 AM, Flavio Pompermaier <[hidden email]> wrote: > Hi Ufuk, > any news on this? > > On Thu, Oct 6, 2016 at 1:30 PM, Ufuk Celebi <[hidden email]> wrote: >> >> I guess that this is caused by a bug in the checksum calculation. Let >> me check that. >> >> On Thu, Oct 6, 2016 at 1:24 PM, Flavio Pompermaier <[hidden email]> >> wrote: >> > I've ran the job once more (always using the checksum branch) and this >> > time >> > I got: >> > >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1953786112 >> > at >> > >> > org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:83) >> > at >> > >> > org.apache.flink.api.common.typeutils.base.EnumSerializer.deserialize(EnumSerializer.java:32) >> > at >> > >> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >> > at >> > >> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >> > at >> > >> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> > at >> > >> > org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100) >> > at >> > >> > org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161) >> > at >> > >> > org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113) >> > at >> > >> > org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45) >> > at >> > >> > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130) >> > at >> > >> > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32) >> > at >> > >> > org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192) >> > at >> > >> > org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64) >> > at >> > >> > org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131) >> > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) >> > at >> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >> > at java.lang.Thread.run(Thread.java:745) >> > >> > >> > On Thu, Oct 6, 2016 at 11:00 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> Yes, if that's the case you should go with option (2) and run with the >> >> checksums I think. >> >> >> >> On Thu, Oct 6, 2016 at 10:32 AM, Flavio Pompermaier >> >> <[hidden email]> wrote: >> >> > The problem is that data is very large and usually cannot run on a >> >> > single >> >> > machine :( >> >> > >> >> > On Thu, Oct 6, 2016 at 10:11 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> >> >> On Wed, Oct 5, 2016 at 7:08 PM, Tarandeep Singh >> >> >> <[hidden email]> >> >> >> wrote: >> >> >> > @Stephan my flink cluster setup- 5 nodes, each running 1 >> >> >> > TaskManager. >> >> >> > Slots >> >> >> > per task manager: 2-4 (I tried varying this to see if this has any >> >> >> > impact). >> >> >> > Network buffers: 5k - 20k (tried different values for it). >> >> >> >> >> >> Could you run the job first on a single task manager to see if the >> >> >> error occurs even if no network shuffle is involved? That should be >> >> >> less overhead for you than running the custom build (which might be >> >> >> buggy ;)). >> >> >> >> >> >> – Ufuk >> >> > >> >> > >> >> > >> >> > >> > >> > >> > > > > |
Ok, thanks for the update Ufuk! Let me know if you need test or anything!
Best, Flavio On Wed, Oct 12, 2016 at 11:26 AM, Ufuk Celebi <[hidden email]> wrote: No, sorry. I was waiting for Tarandeep's feedback before looking into |
Free forum by Nabble | Edit this page |