It looks like the problem is due to the stack trace below.
Simply put, connection failure to Kafka when using the default settings causes job submission to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case,
since I have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP request to take. During these 108 minutes and after, aside
from examining logs & stack traces, it is not possible to determine what is happening with regard to the run job request. It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.
There are a couple problems that are responsible for this situation. Let me know if I should move this discussion to the "devs" list: I am not a member there yet. I am happy to submit JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08
(and 09) initialization as suggested below.
"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable [0x00007fd0cefcb000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)