Hi folks,
I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However, I am unable to submit jobs to the prod cluster.
Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if
the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made,
it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.
No interesting/suspicious log entries have been found. All YARN nodes appear healthy.
Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?
Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed
of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?
Thanks very much!
Shannon Carey
|
It looks like the problem is due to the stack trace below.
Simply put, connection failure to Kafka when using the default settings causes job submission to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case,
since I have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP request to take. During these 108 minutes and after, aside
from examining logs & stack traces, it is not possible to determine what is happening with regard to the run job request. It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.
There are a couple problems that are responsible for this situation. Let me know if I should move this discussion to the "devs" list: I am not a member there yet. I am happy to submit JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08
(and 09) initialization as suggested below.
"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable [0x00007fd0cefcb000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
From: Shannon Carey <[hidden email]> on behalf of Shannon Carey <[hidden email]>
Date: Wednesday, June 1, 2016 at 12:54 PM To: "[hidden email]" <[hidden email]> Subject: API request to submit job takes over 1hr Hi folks,
I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However, I am unable to submit jobs to the prod cluster.
Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if
the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made,
it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.
No interesting/suspicious log entries have been found. All YARN nodes appear healthy.
Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?
Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed
of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?
Thanks very much!
Shannon Carey
|
Hi Shannon, thank you for further investigating the issue. Its fine to keep the discussion on the user@ list. Most devs are on the user list as well and we'll probably file some JIRAs. Regarding your suggestions: 1. Not sure if making the job submission non-blocking is a good idea. We would probably need to interrupt the submitting thread after a while, which does not always work (we made the experience that Kafka and Hadoop for example often ignore interrupts, or even worse gets stuck afterwards). This would just hide the problems or introduce new issues. 2. As you've identified correctly, the real issue here is that the Kafka consumer is querying the brokers for metadata from the constructor (= on the client) not from the workers in the cluster (in the open() method). Changing the behavior is on my todo list. If you want, you can file a JIRA for this. If you have also time to work on this, you can of course also open a pull request. Otherwise, some contributors from the Flink community can take care of the implementation. The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures). Regards, Robert On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey <[hidden email]> wrote:
|
Robert,
Thanks for your thoughtful response.
-Shannon
From: Robert Metzger <[hidden email]>
Date: Thursday, June 2, 2016 at 4:19 AM To: "[hidden email]" <[hidden email]> Subject: Re: API request to submit job takes over 1hr Hi Shannon,
thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user list as well and we'll probably file some JIRAs.
Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would probably need to interrupt the submitting thread after a while, which does not always work (we made the experience that Kafka and Hadoop for example often ignore interrupts,
or even worse gets stuck afterwards). This would just hide the problems or introduce new issues.
2. As you've identified correctly, the real issue here is that the Kafka consumer is querying the brokers for metadata from the constructor (= on the client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for this. If you have also time to work on this, you can of course also open a pull request. Otherwise, some contributors from the Flink community can take care of the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying
in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures).
Regards,
Robert
On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey
<[hidden email]> wrote:
|
Hi Shannon,
Thanks for your investigation on the issue and the JIRA. There's actually a previous JIRA on this problem already: https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate issue? As you can see, I've also referenced a link to FLINK-4069 on FLINK-4023 for your additional info on the problem. A little help with answering your last questions: 1. We're doing the partition distribution across consumers ourselves: the Kafka consumer connector creates a Kafka client on subtasks, and each subtask independently determines which partitions it should be in charge of. There's also information on this blog here for more info: http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ section. As Robert has mentioned, the consumer is currently depending on the fixed ordered list of partitions sent to all subtasks so that each of them always determine the same set of partitions to fetch from across restarts. 2. Following the above description, currently the consumer is only subscribing to the fixed partition list queried in the constructor. So at the moment the Flink Kafka consumer doesn't handle repartitioning of topics, but it's definitely on the todo list for the Kafka connector and won't be too hard to implement once querying in the consumer is resolved (perhaps Robert can clarify this a bit more). Best, Gordon |
Hi, I see your points. I agree that we should use a separate thread for running the main method and report better to the front end what's happening. Re 2.: The main() method is executed on the Application Master / JobManager / Web Interface JVM, when submitting the job through the web interface. When using the ./bin/flink tool, its executed there. As Gordon said correctly, Flink doesn't pick up newly added partitions. Leader changes are handled transparently since 1.0.0. I hope that clarify things. On Tue, Jun 14, 2016 at 5:57 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: Hi Shannon, |
Free forum by Nabble | Edit this page |