Flink streaming-job with redis sink: SocketTimeoutException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink streaming-job with redis sink: SocketTimeoutException

Marke Builder
Hi,

what can be the reasons for the following exceptions.
We are using flink with a redis sink, but from time to time the flink job failed with the follwing excpetions.

Thanks, Builder.


10/13/2018 15:37:48     Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10) switched to FAILED
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
        at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
        at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
        at redis.clients.jedis.Protocol.process(Protocol.java:141)
        at redis.clients.jedis.Protocol.read(Protocol.java:205)
        at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
        at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196)
        at redis.clients.jedis.Jedis.set(Jedis.java:69)
        at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178)
        at org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143)
        at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27)
        at com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.net.SocketInputStream.read(SocketInputStream.java:127)
        at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
        ... 26 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink streaming-job with redis sink: SocketTimeoutException

Amit Jain
Hi Marke,

Stacktrace suggests it is more of a Redis connection issue rather than something with Flink. Could you share JedisPool configuration of Redis sink? Are you writing into Redis in continuity or some bulk logic? Looks like Redis connections are getting timeout here.  

--
Thanks,
Amit

On Mon, Oct 15, 2018 at 12:19 PM Marke Builder <[hidden email]> wrote:
Hi,

what can be the reasons for the following exceptions.
We are using flink with a redis sink, but from time to time the flink job failed with the follwing excpetions.

Thanks, Builder.


10/13/2018 15:37:48     Flat Map -> (Sink: Unnamed, Sink: Unnamed)(9/10) switched to FAILED
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
        at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201)
        at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
        at redis.clients.jedis.Protocol.process(Protocol.java:141)
        at redis.clients.jedis.Protocol.read(Protocol.java:205)
        at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297)
        at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:196)
        at redis.clients.jedis.Jedis.set(Jedis.java:69)
        at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.set(RedisContainer.java:178)
        at org.apache.flink.streaming.connectors.redis.RedisSink.invoke(RedisSink.java:143)
        at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
        at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:27)
        at com.voith.cloud.cache.operators.FlatMapMeasurements.flatMap(FlatMapMeasurements.java:12)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.net.SocketInputStream.read(SocketInputStream.java:127)
        at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:195)
        ... 26 more