Hello,
I am trying to use HyperLogLog in stream-lib(https://github.com/addthis/stream-lib) in my Flink streaming job, but when I submit the job, I got the following error. My Flink version is 1.0.1. --- org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. (...) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. (...) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:209) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Read size does not match expected size. at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:291) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.readObject(WindowOperator.java:182) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194) ... 3 more ---- It looks like the serialize/deserialize problem. Length written by StateDescriptor.writeObject() and the actual length in readObject() differs? I have no idea why this happens. The code is this: --- val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromElements((1,"a"),(1,"b"),(1,"c"),(1,"d")) stream .keyBy(0) .countWindow(3) .fold(new HyperLogLog(20)){(r,i) => r.offer(i._2); r} .map{x => x.cardinality()} stream.print env.execute("HLLTest") --- Any help would be appreciated. Regards, Hionori |
Hi, I'm afraid you found a bug. I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3760. I already have a fix and hope we'll get it in the 1.0.2 release that we are just about to release. Cheers, Aljoscha On Wed, 13 Apr 2016 at 07:25 Hironori Ogibayashi <[hidden email]> wrote: Hello, |
Aljoscha,
Thanks for the fix. I tried recent master branch and it worked! Hironori 2016-04-14 23:03 GMT+09:00 Aljoscha Krettek <[hidden email]>: > Hi, > I'm afraid you found a bug. I created a Jira Issue: > https://issues.apache.org/jira/browse/FLINK-3760. I already have a fix and > hope we'll get it in the 1.0.2 release that we are just about to release. > > Cheers, > Aljoscha > > On Wed, 13 Apr 2016 at 07:25 Hironori Ogibayashi <[hidden email]> > wrote: >> >> Hello, >> >> I am trying to use HyperLogLog in >> stream-lib(https://github.com/addthis/stream-lib) >> in my Flink streaming job, but when I submit the job, I got the >> following error. My Flink version is 1.0.1. >> >> --- >> org.apache.flink.client.program.ProgramInvocationException: The >> program execution failed: Job execution failed. >> (...) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> (...) >> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >> Cannot instantiate user function. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:209) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Read size does not match expected >> size. >> at >> org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:291) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >> at >> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.readObject(WindowOperator.java:182) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194) >> ... 3 more >> ---- >> >> It looks like the serialize/deserialize problem. Length written by >> StateDescriptor.writeObject() and the actual length in readObject() >> differs? I have no idea why this happens. >> >> The code is this: >> >> --- >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> val stream = env.fromElements((1,"a"),(1,"b"),(1,"c"),(1,"d")) >> stream >> .keyBy(0) >> .countWindow(3) >> .fold(new HyperLogLog(20)){(r,i) => r.offer(i._2); r} >> .map{x => x.cardinality()} >> >> stream.print >> >> env.execute("HLLTest") >> --- >> >> Any help would be appreciated. >> >> Regards, >> Hionori |
Free forum by Nabble | Edit this page |