Correct way to handle RedisSink exception

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Correct way to handle RedisSink exception

Manas Kale
Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?


asd


Regards,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: Correct way to handle RedisSink exception

Chesnay Schepler
You will have to create a custom version of the redis connector that ignores such exceptions.

On 10/15/2020 1:27 PM, Manas Kale wrote:
Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?


asd


Regards,
Manas


Reply | Threaded
Open this post in threaded view
|

Re:Correct way to handle RedisSink exception

阮树斌 浙江大学
In reply to this post by Manas Kale
hello, Manas Kale.

From the log, it can be found that the exception was thrown on the 'open()' method of the RedisSink class. You can inherit the RedisSink class, then override the 'open()' method, and handle the exception as you wish.Or no longer use Apache Bahir[1] Flink redis connector class library, and inherit RichSinkFunction to develop a custom RedisSink class.

Regards
Shubin Ruan

At 2020-10-15 19:27:29, "Manas Kale" <[hidden email]> wrote:

Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?


asd


Regards,
Manas


 



 

Reply | Threaded
Open this post in threaded view
|

Re: Correct way to handle RedisSink exception

Manas Kale
Hi all,
Thank you for the help, I understand now.

On Thu, Oct 15, 2020 at 5:28 PM 阮树斌 浙江大学 <[hidden email]> wrote:
hello, Manas Kale.

From the log, it can be found that the exception was thrown on the 'open()' method of the RedisSink class. You can inherit the RedisSink class, then override the 'open()' method, and handle the exception as you wish.Or no longer use Apache Bahir[1] Flink redis connector class library, and inherit RichSinkFunction to develop a custom RedisSink class.

Regards
Shubin Ruan

At 2020-10-15 19:27:29, "Manas Kale" <[hidden email]> wrote:

Hi,
I have a streaming application that pushes output to a redis cluster sink. I am using the Apache Bahir[1] Flink redis connector for this. I want to handle the case when the redis server is unavailable. 
I am following the same pattern as outlined by them in [1]:
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();

DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
However, if the redis server is not available, my whole job crashes with this exception:

ERROR org.apache.flink.streaming.connectors.redis.RedisSink         - Redis has not been properly initialized:
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)
...

I want to handle and ignore such exceptions thrown by the RedisSink class. Where exactly do I put my try/catch to do this? Enclosing the last in the code snippet with try/catch does not work.
I believe the only way to do this would be to handle the exception in the RedisSink class, but that is a library class provided by Bahir. Is my thinking correct?


asd


Regards,
Manas