API request to submit job takes over 1hr

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

API request to submit job takes over 1hr

Shannon Carey
Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However,  I am unable to submit jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made, it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear healthy.

Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?

Thanks very much!
Shannon Carey
Reply | Threaded
Open this post in threaded view
|

Re: API request to submit job takes over 1hr

Shannon Carey
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes job submission to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP request to take. During these 108 minutes and after, aside from examining logs & stack traces, it is not possible to determine what is happening with regard to the run job request. It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me know if I should move this discussion to the "devs" list: I am not a member there yet. I am happy to submit JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08 (and 09) initialization as suggested below.
  1. JarRunHandler is provided with a timeout value, but that timeout value is ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" requests to take arbitrary amounts of time during which the status of the request and the job is unclear. Depending on the semantics of the work that method does, perhaps it could be made asynchronous with a timeout?
  2. FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's constructor) performs network interaction & retries that can take a long time, and the constructor is in the execution path beneath getJobGraphAndClassLoader() via the main() method of the submitted Flink job. It is not necessary to do that work (retrieving Kafka partition info) in the constructor. Instead, that work should occur when the job is asked to start, either by overriding the AbstractRichFunction#open() method or by adding it to the top of the run() method. Alternatively, though not any better, the signature of StreamExecutionEnvironment#addSource() could be changed to take some kind of Factory<SourceFunction> so that instantiation is deferred until necessary.
"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable [0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <[hidden email]> on behalf of Shannon Carey <[hidden email]>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "[hidden email]" <[hidden email]>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However,  I am unable to submit jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made, it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear healthy.

Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?

Thanks very much!
Shannon Carey
Reply | Threaded
Open this post in threaded view
|

Re: API request to submit job takes over 1hr

rmetzger0
Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would probably need to interrupt the submitting thread after a while, which does not always work (we made the experience that Kafka and Hadoop for example often ignore interrupts, or even worse gets stuck afterwards). This would just hide the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka consumer is querying the brokers for metadata from the constructor (= on the client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for this. If you have also time to work on this, you can of course also open a pull request. Otherwise, some contributors from the Flink community can take care of the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey <[hidden email]> wrote:
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes job submission to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP request to take. During these 108 minutes and after, aside from examining logs & stack traces, it is not possible to determine what is happening with regard to the run job request. It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me know if I should move this discussion to the "devs" list: I am not a member there yet. I am happy to submit JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08 (and 09) initialization as suggested below.
  1. JarRunHandler is provided with a timeout value, but that timeout value is ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" requests to take arbitrary amounts of time during which the status of the request and the job is unclear. Depending on the semantics of the work that method does, perhaps it could be made asynchronous with a timeout?
  2. FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's constructor) performs network interaction & retries that can take a long time, and the constructor is in the execution path beneath getJobGraphAndClassLoader() via the main() method of the submitted Flink job. It is not necessary to do that work (retrieving Kafka partition info) in the constructor. Instead, that work should occur when the job is asked to start, either by overriding the AbstractRichFunction#open() method or by adding it to the top of the run() method. Alternatively, though not any better, the signature of StreamExecutionEnvironment#addSource() could be changed to take some kind of Factory<SourceFunction> so that instantiation is deferred until necessary.
"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable [0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <[hidden email]> on behalf of Shannon Carey <[hidden email]>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "[hidden email]" <[hidden email]>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However,  I am unable to submit jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made, it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear healthy.

Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?

Thanks very much!
Shannon Carey

Reply | Threaded
Open this post in threaded view
|

Re: API request to submit job takes over 1hr

Shannon Carey
Robert, 

Thanks for your thoughtful response.

  1. I understand your concern. User code is not guaranteed to respond to thread interrupts. So no matter what you do, you may end up with a stuck thread. But I think we can improve the user experience. First, we can update the documentation to make it clear that the main() method will be executed during job submission, and that jobs should make an effort to avoid doing time-consuming work in that main method. Second, I still think it's in your best interest to decouple the job submission thread from the HTTP thread. That would un-hide the problem, because the end-user could see that their job request has been started but is not making it past a certain point (maybe it's in one phase/status before main() executes, and in a different status once main() completes). Also, it would be obvious if they have made (and failed or aborted) multiple job submission API requests that those requests are still occupying threads. Right now, it's impossible to tell what has happened to the request or whether it is occupying a thread without relying on log output (which took us a while to get right in AWS EMR YARN) or a stack dump. Ideally, the UI should be able to list all the threads that are currently working on job submission.
  2. I see, the main method will execute on the Application Master, right? I created https://issues.apache.org/jira/browse/FLINK-4069 Unfortunately, I don't understand very well how Kafka brokers & clients cooperate to make sure that partitions are distributed across consumers that share a group id (is there documentation about that somewhere?)… Also, I'm not sure how Flink deals with repartitioning.
-Shannon

From: Robert Metzger <[hidden email]>
Date: Thursday, June 2, 2016 at 4:19 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: API request to submit job takes over 1hr

Hi Shannon,

thank you for further investigating the issue.
Its fine to keep the discussion on the user@ list. Most devs are on the user list as well and we'll probably file some JIRAs.

Regarding your suggestions:
1. Not sure if making the job submission non-blocking is a good idea. We would probably need to interrupt the submitting thread after a while, which does not always work (we made the experience that Kafka and Hadoop for example often ignore interrupts, or even worse gets stuck afterwards). This would just hide the problems or introduce new issues.

2. As you've identified correctly, the real issue here is that the Kafka consumer is querying the brokers for metadata from the constructor (= on the client) not from the workers in the cluster (in the open() method).
Changing the behavior is on my todo list. If you want, you can file a JIRA for this. If you have also time to work on this, you can of course also open a pull request. Otherwise, some contributors from the Flink community can take care of the implementation.
The main reason why we do the querying centrally is: a) avoid overloading the brokers b) send the same list of partitions (in the same order) to all parallel consumers to do a fixed partition assignments (also across restarts). When we do the querying in the open() method, we need to make sure that all partitions are assigned, without duplicates (also after restarts in case of failures).

Regards,
Robert




On Thu, Jun 2, 2016 at 1:44 AM, Shannon Carey <[hidden email]> wrote:
It looks like the problem is due to the stack trace below.

Simply put, connection failure to Kafka when using the default settings causes job submission to take over (flink.get-partitions.retry * tries by SimpleConsumer * socket.timeout.ms * # of Kafka brokers) = (3 * 2 * 30 * (# of Kafka brokers)) seconds. In my case, since I have 36 Kafka brokers, it took over 108 minutes. This is beyond the maximum idle connection timeout of an AWS ELB of 60 minutes, and beyond the normal length of time most people expect an HTTP request to take. During these 108 minutes and after, aside from examining logs & stack traces, it is not possible to determine what is happening with regard to the run job request. It simply appears to hang and then fail, typically with a 504 Gateway Timeout status.

There are a couple problems that are responsible for this situation. Let me know if I should move this discussion to the "devs" list: I am not a member there yet. I am happy to submit JIRAs and I would be able to submit a Pull Request for the change to FlinkKafkaConsumer08 (and 09) initialization as suggested below.
  1. JarRunHandler is provided with a timeout value, but that timeout value is ignored when calling getJobGraphAndClassLoader(). This allows HTTP "run" requests to take arbitrary amounts of time during which the status of the request and the job is unclear. Depending on the semantics of the work that method does, perhaps it could be made asynchronous with a timeout?
  2. FlinkKafkaConsumer08's constructor (as well as the Kafka 0.9 consumer's constructor) performs network interaction & retries that can take a long time, and the constructor is in the execution path beneath getJobGraphAndClassLoader() via the main() method of the submitted Flink job. It is not necessary to do that work (retrieving Kafka partition info) in the constructor. Instead, that work should occur when the job is asked to start, either by overriding the AbstractRichFunction#open() method or by adding it to the top of the run() method. Alternatively, though not any better, the signature of StreamExecutionEnvironment#addSource() could be changed to take some kind of Factory<SourceFunction> so that instantiation is deferred until necessary.
"nioEventLoopGroup-3-14" #41 prio=10 os_prio=0 tid=0x00007fd0e870b000 nid=0x167d runnable [0x00007fd0cefcb000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
- locked <0x000000076a190060> (a java.lang.Object)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:110)
- locked <0x000000076a1900f0> (a java.lang.Object)
at kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
- locked <0x000000076a190180> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:55)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
- locked <0x000000076a190238> (a java.lang.Object)
at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:521)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:218)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:193)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.<init>(FlinkKafkaConsumer08.java:160)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept$.main(ProofOfConcept.scala:60)
at com.expedia.www.hendrix.flinkproto.ProofOfConcept.main(ProofOfConcept.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:215)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:135)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:112)
at org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.channelRead0(RuntimeMonitorHandler.java:60)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
at io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:158)
at org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)

From: Shannon Carey <[hidden email]> on behalf of Shannon Carey <[hidden email]>
Date: Wednesday, June 1, 2016 at 12:54 PM
To: "[hidden email]" <[hidden email]>
Subject: API request to submit job takes over 1hr

Hi folks,

I have deployed a Flink cluster on top of YARN in an AWS EMR cluster in my test environment, and everything is working fine. However,  I am unable to submit jobs to the prod cluster.

Uploading the JAR containing a Flink job succeeds. However, the request to run the job (UI makes API request to /jars/<jarname>/run?<params>) takes so long to complete that the ELB finally returns a 504 GATEWAY_TIMEOUT response. This is the case even if the ELB timeout is set to 1hr: the request returns 504 after 1hr. The request appears to fail server-side, also, since no jobs have ever showed up in the UI as being in any status (successful/failed/completed or otherwise). Shortly after the request is made, it is interesting to note that sometimes (but not always), other requests by the UI to the API begin to take longer than usual, although they do all eventually complete.

No interesting/suspicious log entries have been found. All YARN nodes appear healthy.

Does anyone have ideas about what the problem might be? Or ideas about troubleshooting steps I should take?

Also, I was wondering if 1GB is a reasonable amount of memory to use for the Flink Job Manager? It appears to be using only ~570MB but I am not sure if the Job Manager might be misbehaving due to resource constraints. The prod cluster is currently composed of six c3.2xlarge EC2 instances. Task memory is set to 10496, Job Manager memory is set to 1024, and there are 8 slots set in the yarn-session.sh command. Are there any guidelines for memory allocation for the Job Manager?

Thanks very much!
Shannon Carey

Reply | Threaded
Open this post in threaded view
|

Re: API request to submit job takes over 1hr

Tzu-Li Tai
Hi Shannon,

Thanks for your investigation on the issue and the JIRA. There's actually a previous JIRA on this problem already: https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate issue? As you can see, I've also referenced a link to FLINK-4069 on FLINK-4023 for your additional info on the problem.

A little help with answering your last questions:
1. We're doing the partition distribution across consumers ourselves: the Kafka consumer connector creates a Kafka client on subtasks, and each subtask independently determines which partitions it should be in charge of. There's also information on this blog here for more info: http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ section. As Robert has mentioned, the consumer is currently depending on the fixed ordered list of partitions sent to all subtasks so that each of them always determine the same set of partitions to fetch from across restarts.
2. Following the above description, currently the consumer is only subscribing to the fixed partition list queried in the constructor. So at the moment the Flink Kafka consumer doesn't handle repartitioning of topics, but it's definitely on the todo list for the Kafka connector and won't be too hard to implement once querying in the consumer is resolved (perhaps Robert can clarify this a bit more).

Best,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: API request to submit job takes over 1hr

rmetzger0
Hi,

Regarding Shannon first point: I agree. We can improve the user experience a lot, and documenting the behavior is the first step we should do here.
I see your points. I agree that we should use a separate thread for running the main method and report better to the front end what's happening.

Re 2.: The main() method is executed on the Application Master / JobManager / Web Interface JVM, when submitting the job through the web interface.
When using the ./bin/flink tool, its executed there.
As Gordon said correctly, Flink doesn't pick up newly added partitions. Leader changes are handled transparently since 1.0.0.

I hope that clarify things.

On Tue, Jun 14, 2016 at 5:57 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Shannon,

Thanks for your investigation on the issue and the JIRA. There's actually a
previous JIRA on this problem already:
https://issues.apache.org/jira/browse/FLINK-4023. Would you be ok with
tracking this issue on FLINK-4023, and close FLINK-4069 as a duplicate
issue? As you can see, I've also referenced a link to FLINK-4069 on
FLINK-4023 for your additional info on the problem.

A little help with answering your last questions:
1. We're doing the partition distribution across consumers ourselves: the
Kafka consumer connector creates a Kafka client on subtasks, and each
subtask independently determines which partitions it should be in charge of.
There's also information on this blog here for more info:
http://data-artisans.com/kafka-flink-a-practical-how-to/, on the last FAQ
section. As Robert has mentioned, the consumer is currently depending on the
fixed ordered list of partitions sent to all subtasks so that each of them
always determine the same set of partitions to fetch from across restarts.
2. Following the above description, currently the consumer is only
subscribing to the fixed partition list queried in the constructor. So at
the moment the Flink Kafka consumer doesn't handle repartitioning of topics,
but it's definitely on the todo list for the Kafka connector and won't be
too hard to implement once querying in the consumer is resolved (perhaps
Robert can clarify this a bit more).

Best,
Gordon



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-tp7319p7558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.