RemoteTransportException when trying to redis in flink code

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

RemoteTransportException when trying to redis in flink code

Balaji Rajagopalan

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)

Reply | Threaded
Open this post in threaded view
|

Re: RemoteTransportException when trying to redis in flink code

Till Rohrmann
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)


Reply | Threaded
Open this post in threaded view
|

Re: RemoteTransportException when trying to redis in flink code

Balaji Rajagopalan
Till,
  I have checked from all the taskmanager nodes I am able to establish a connection by installing a redis-cli on those nodes. The thing is in the constructor I am able to set and get values, also I am getting PONG for the ping. But once object is initialized when I try to call DriverStreamHelper.get and DriverStreamHelper.set from map/apply function I get the connection refused. This may not be related to flink but rather to some security setting with Amazon AWS EMR, this is assumption now. I have also tried with 3 different redis libraries to rule out any errors with libraries the same exception in all. 

object DriverStreamHelper {


implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

val redisClient = RedisClient(host=redisHost, port=redisPort)

val p = redisClient.ping()
p.map{ res => LOG.info(s"Reply from Redis client : $res") }



val postFix = System.currentTimeMillis()
val key = "some-key" + postFix
val value = "some-value" + postFix
set(key, value, Some(10000L))
LOG.info(s"Going to get the value from Redis ${get(key)}")

def set(k: String, v: String): Unit = {
redisClient.set(k,v)
}
  def set(k: String, v: String, exTime: Option[Long]): Unit = {
redisClient.set(k,v,exTime)
}

def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}
}

On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <[hidden email]> wrote:
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)



Reply | Threaded
Open this post in threaded view
|

Re: RemoteTransportException when trying to redis in flink code

Till Rohrmann
Hmm I'm not a Redis expert, but are you sure that you see a successful ping reply in the logs of the TaskManagers and not only in the client logs? 

Another thing: Is the redisClient thread safe? Multiple map tasks might be accessing the set and get methods concurrently.

Another question: The code of DriverStreamHelper you've just sent is not the code you've used when receiving the stack trace, right? Because in the stack trace it's written that you access a RedisClientPool from the DriverStreamHelper.set method.

Cheers,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <[hidden email]> wrote:
Till,
  I have checked from all the taskmanager nodes I am able to establish a connection by installing a redis-cli on those nodes. The thing is in the constructor I am able to set and get values, also I am getting PONG for the ping. But once object is initialized when I try to call DriverStreamHelper.get and DriverStreamHelper.set from map/apply function I get the connection refused. This may not be related to flink but rather to some security setting with Amazon AWS EMR, this is assumption now. I have also tried with 3 different redis libraries to rule out any errors with libraries the same exception in all. 

object DriverStreamHelper {


implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

val redisClient = RedisClient(host=redisHost, port=redisPort)

val p = redisClient.ping()
p.map{ res => LOG.info(s"Reply from Redis client : $res") }



val postFix = System.currentTimeMillis()
val key = "some-key" + postFix
val value = "some-value" + postFix
set(key, value, Some(10000L))
LOG.info(s"Going to get the value from Redis ${get(key)}")

def set(k: String, v: String): Unit = {
redisClient.set(k,v)
}
  def set(k: String, v: String, exTime: Option[Long]): Unit = {
redisClient.set(k,v,exTime)
}

def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}
}

On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <[hidden email]> wrote:
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)




Reply | Threaded
Open this post in threaded view
|

Re: RemoteTransportException when trying to redis in flink code

Balaji Rajagopalan
Till,
  Found the issue, it was my bad assumption about GlobalConfiguration, what I thought was once the configuration is read from the client machine GlobalConfiguration params will passed on to the task manager nodes, as well, it was not and values from default was getting pickup, which was localhost 6379 and there was no redis running in localhost of task manager. 

balaji 

On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <[hidden email]> wrote:
Hmm I'm not a Redis expert, but are you sure that you see a successful ping reply in the logs of the TaskManagers and not only in the client logs? 

Another thing: Is the redisClient thread safe? Multiple map tasks might be accessing the set and get methods concurrently.

Another question: The code of DriverStreamHelper you've just sent is not the code you've used when receiving the stack trace, right? Because in the stack trace it's written that you access a RedisClientPool from the DriverStreamHelper.set method.

Cheers,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <[hidden email]> wrote:
Till,
  I have checked from all the taskmanager nodes I am able to establish a connection by installing a redis-cli on those nodes. The thing is in the constructor I am able to set and get values, also I am getting PONG for the ping. But once object is initialized when I try to call DriverStreamHelper.get and DriverStreamHelper.set from map/apply function I get the connection refused. This may not be related to flink but rather to some security setting with Amazon AWS EMR, this is assumption now. I have also tried with 3 different redis libraries to rule out any errors with libraries the same exception in all. 

object DriverStreamHelper {


implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

val redisClient = RedisClient(host=redisHost, port=redisPort)

val p = redisClient.ping()
p.map{ res => LOG.info(s"Reply from Redis client : $res") }



val postFix = System.currentTimeMillis()
val key = "some-key" + postFix
val value = "some-value" + postFix
set(key, value, Some(10000L))
LOG.info(s"Going to get the value from Redis ${get(key)}")

def set(k: String, v: String): Unit = {
redisClient.set(k,v)
}
  def set(k: String, v: String, exTime: Option[Long]): Unit = {
redisClient.set(k,v,exTime)
}

def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}
}

On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <[hidden email]> wrote:
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)





Reply | Threaded
Open this post in threaded view
|

Re: RemoteTransportException when trying to redis in flink code

Till Rohrmann
Great to hear that you solved your problem :-)

On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan <[hidden email]> wrote:
Till,
  Found the issue, it was my bad assumption about GlobalConfiguration, what I thought was once the configuration is read from the client machine GlobalConfiguration params will passed on to the task manager nodes, as well, it was not and values from default was getting pickup, which was localhost 6379 and there was no redis running in localhost of task manager. 

balaji 

On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <[hidden email]> wrote:
Hmm I'm not a Redis expert, but are you sure that you see a successful ping reply in the logs of the TaskManagers and not only in the client logs? 

Another thing: Is the redisClient thread safe? Multiple map tasks might be accessing the set and get methods concurrently.

Another question: The code of DriverStreamHelper you've just sent is not the code you've used when receiving the stack trace, right? Because in the stack trace it's written that you access a RedisClientPool from the DriverStreamHelper.set method.

Cheers,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <[hidden email]> wrote:
Till,
  I have checked from all the taskmanager nodes I am able to establish a connection by installing a redis-cli on those nodes. The thing is in the constructor I am able to set and get values, also I am getting PONG for the ping. But once object is initialized when I try to call DriverStreamHelper.get and DriverStreamHelper.set from map/apply function I get the connection refused. This may not be related to flink but rather to some security setting with Amazon AWS EMR, this is assumption now. I have also tried with 3 different redis libraries to rule out any errors with libraries the same exception in all. 

object DriverStreamHelper {


implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")

val redisClient = RedisClient(host=redisHost, port=redisPort)

val p = redisClient.ping()
p.map{ res => LOG.info(s"Reply from Redis client : $res") }



val postFix = System.currentTimeMillis()
val key = "some-key" + postFix
val value = "some-value" + postFix
set(key, value, Some(10000L))
LOG.info(s"Going to get the value from Redis ${get(key)}")

def set(k: String, v: String): Unit = {
redisClient.set(k,v)
}
  def set(k: String, v: String, exTime: Option[Long]): Unit = {
redisClient.set(k,v,exTime)
}

def get(k: String): Option[String] = {
import scala.concurrent.duration._
val f = redisClient.get[String](k)
Await.result(f, 1.seconds) //FIXME - really bad need to return future here.
}
}

On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <[hidden email]> wrote:
Hi Balaji,

from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes?

Cheers,
Till

On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:

I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get  into this exception. Any inputs on what might be going wrong. 

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'.

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)

at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused

        at com.redis.IO$class.connect(IO.scala:37)

        at com.redis.RedisClient.connect(RedisClient.scala:94)

        at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

        at com.redis.RedisClient.initialize(RedisClient.scala:94)

        at com.redis.RedisClient.<init>(RedisClient.scala:98)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:12)

        at com.redis.RedisClientFactory.makeObject(Pool.scala:7)

        at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)

        at com.redis.RedisClientPool.withClient(Pool.scala:34)

        at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

        at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

        at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)

        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)

        at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)

        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)

        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)

        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.ConnectException: Connection refused

        at java.net.PlainSocketImpl.socketConnect(Native Method)

        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

        at java.net.Socket.connect(Socket.java:589)

        at java.net.Socket.connect(Socket.java:538)

        at java.net.Socket.<init>(Socket.java:434)

        at java.net.Socket.<init>(Socket.java:211)