Hello,
So I am trying to use jedis (redis java client) with Flink streaming api, but I get an exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) at org.apache.flink.client.program.Client.run(Client.java:278) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624) at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) ... 6 more Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) ... 16 more so my code I am using: public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String> . . . Any one know a fix for this? |
Maybe wrapping Jedis with a serializable class will do the trick? But in general is there a way to reference jar classes in flink apps without serializable them?
|
Hey Jerry, Jay is on the right track, this issue has to do with the Flink operator life cycle. When you hit execute all your user defined classes get serialized, so that they can be shipped to the workers on the cluster. To execute some code before your FlatMapFunction starts processing the data you can use the open() function of the RichFlatMapFunction, thus enabling you to make the Jedis attribute transient: public static class RedisJoinBolt implements RichFlatMapFunction<Tuple5<String, String,String,String,String> On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |