I am trying to use AWS EMR yarn cluster where the flink code runs, in one of apply window function, I try to set some values in redis it fails. I have tried to access the same redis with no flink code and get/set works, but from the flink I get into this exception. Any inputs on what might be going wrong. org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'some-ip'. at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32) at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ... 2 more Caused by: java.lang.RuntimeException: java.net.ConnectException: Connection refused at com.redis.IO$class.connect(IO.scala:37) at com.redis.RedisClient.connect(RedisClient.scala:94) at com.redis.RedisCommand$class.initialize(RedisClient.scala:71) at com.redis.RedisClient.initialize(RedisClient.scala:94) at com.redis.RedisClient.<init>(RedisClient.scala:98) at com.redis.RedisClientFactory.makeObject(Pool.scala:12) at com.redis.RedisClientFactory.makeObject(Pool.scala:7) at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149) at com.redis.RedisClientPool.withClient(Pool.scala:34) at com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57) at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35) at com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29) at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.<init>(Socket.java:434) at java.net.Socket.<init>(Socket.java:211) |
Hi Balaji, from the stack trace it looks as if you cannot open a connection redis. Have you checked that you can access redis from all your TaskManager nodes? Cheers, Till On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <[hidden email]> wrote:
|
Till, I have checked from all the taskmanager nodes I am able to establish a connection by installing a redis-cli on those nodes. The thing is in the constructor I am able to set and get values, also I am getting PONG for the ping. But once object is initialized when I try to call DriverStreamHelper.get and DriverStreamHelper.set from map/apply function I get the connection refused. This may not be related to flink but rather to some security setting with Amazon AWS EMR, this is assumption now. I have also tried with 3 different redis libraries to rule out any errors with libraries the same exception in all. object DriverStreamHelper { def set(k: String, v: String, exTime: Option[Long]): Unit = { } On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <[hidden email]> wrote:
|
Hmm I'm not a Redis expert, but are you sure that you see a successful ping reply in the logs of the TaskManagers and not only in the client logs? Another thing: Is the redisClient thread safe? Multiple map tasks might be accessing the set and get methods concurrently. Another question: The code of DriverStreamHelper you've just sent is not the code you've used when receiving the stack trace, right? Because in the stack trace it's written that you access a RedisClientPool from the DriverStreamHelper.set method. Cheers, Till On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <[hidden email]> wrote:
|
Till, Found the issue, it was my bad assumption about GlobalConfiguration, what I thought was once the configuration is read from the client machine GlobalConfiguration params will passed on to the task manager nodes, as well, it was not and values from default was getting pickup, which was localhost 6379 and there was no redis running in localhost of task manager. balaji On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <[hidden email]> wrote:
|
Great to hear that you solved your problem :-) On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |