Using Flink with Redis question

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Using Flink with Redis question

Jerry Peng
Hello,

So I am trying to use jedis (redis java client) with Flink streaming api, but I get an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

at org.apache.flink.client.program.Client.run(Client.java:278)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)

Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not serializable

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)

at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)

at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)

at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

... 6 more

Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

... 16 more





so my code I am using: 


public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String>
, Tuple6<String, String,String,String,String,String>> {
private Jedis jedis;
private HashMap<String, String> ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedis = new Jedis(jedisServer);
}

@Override
public void flatMap(Tuple5<String,String,String,String,String> input,
Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception {
.
.
.

Any one know a fix for this?
Reply | Threaded
Open this post in threaded view
|

Re: Using Flink with Redis question

jay vyas
Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without serializable them?


On Sep 4, 2015, at 1:36 PM, Jerry Peng <[hidden email]> wrote:

Hello,

So I am trying to use jedis (redis java client) with Flink streaming api, but I get an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

at org.apache.flink.client.program.Client.run(Client.java:278)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)

Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not serializable

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)

at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)

at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)

at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

... 6 more

Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

... 16 more





so my code I am using: 


public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String>
, Tuple6<String, String,String,String,String,String>> {
private Jedis jedis;
private HashMap<String, String> ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedis = new Jedis(jedisServer);
}

@Override
public void flatMap(Tuple5<String,String,String,String,String> input,
Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception {
.
.
.

Any one know a fix for this?
Reply | Threaded
Open this post in threaded view
|

Re: Using Flink with Redis question

Márton Balassi
Hey Jerry,

Jay is on the right track, this issue has to do with the Flink operator life cycle. When you hit execute all your user defined classes get serialized, so that they can be shipped to the workers on the cluster. To execute some code before your FlatMapFunction starts processing the data you can use the open() function of the RichFlatMapFunction, thus enabling you to make the Jedis attribute transient:

public static class RedisJoinBolt implements RichFlatMapFunction<Tuple5<String, String,String,String,String>
, Tuple6<String, String,String,String,String,String>> {
private transient Jedis jedis;
private Jedis jedisServer;
private HashMap<String, String> ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedisServer = jedisServer;
}

@Override
public void open(Configuration parameters) {
//initialize jedis
this.jedis = new Jedis(jedisServer);
}

@Override
public void flatMap(Tuple5<String,String,String,String,String> input,
Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception {

On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas <[hidden email]> wrote:
Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without serializable them?


On Sep 4, 2015, at 1:36 PM, Jerry Peng <[hidden email]> wrote:

Hello,

So I am trying to use jedis (redis java client) with Flink streaming api, but I get an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)

at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

at org.apache.flink.client.program.Client.run(Client.java:278)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)

Caused by: org.apache.flink.api.common.InvalidProgramException: Object flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not serializable

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)

at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)

at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)

at flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

... 6 more

Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)

at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

... 16 more





so my code I am using: 


public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, String,String,String,String>
, Tuple6<String, String,String,String,String,String>> {
private Jedis jedis;
private HashMap<String, String> ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedis = new Jedis(jedisServer);
}

@Override
public void flatMap(Tuple5<String,String,String,String,String> input,
Collector<Tuple6<String,String,String,String,String,String>> out) throws Exception {
.
.
.

Any one know a fix for this?