Hey all, My
jobs that I am trying to write in Flink 1.5 are failing after a few
minutes. I think its because the idle task managers are shutting down, which seems to kill the client and the running job. The running job itself was still
going on one of the other task managers. I get: org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager 'xxxx'. This
might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143) Now
I happen to have the last part of the flow paralleled to 1 right now
for debugging, so the 4 task managers that are spun up, 3 of them hit
the timeout period (currently set to 240000). I think as soon as the
first one goes the client throws up and the whole job dies as a result. Is this expected behavior and if so, is there another way around it? Do I keep increasing the slotmanager.taskmanager-timeout to a really really large number? I have verified setting the timeout to 840000 lets the job complete without error.Thank you! |
Hi Garrett, I agree, there seems to be an issue and increasing the timeout should not be the right approach to solve it. Are you running streaming or batch jobs, i.e., do some of the tasks finish much earlier than others? I'm adding Till to this thread who's very familiar with scheduling and process communication. Best, Fabian 2018-06-19 0:03 GMT+02:00 Garrett Barton <[hidden email]>:
|
Hi Garrett, killing of idle TaskManager should not affect the execution of the job. By definition a TaskManager only idles if it does not execute any tasks. Could you maybe share the complete logs (of the cluster entrypoint and all TaskManagers) with us? Cheers, Till On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske <[hidden email]> wrote:
|
Thank you all for the reply! I am running batch jobs, I read in a handful of files from HDFS and output to HBase, HDFS, and Kafka. I run into this when I have partial usage of the cluster as the job runs. So right now I spin up 20 nodes with 3 slots, my job at peak uses all 60 slots, but by the end of it since my outputs are all forced parallel 1 while I work out kinks, that all typically ends up running in 1 or two task managers tops. The other 18-19 task managers die off. Problem is as soon as any task manager dies off, my client throws the above exception and the job fails. I cannot share logs, but I was thinking about writing a dirt simple mapreduce flow based on the wordcount example. The example would have a wide map phase that generates data, and then I'd run it through a reducer that sleeps maybe 1 second every record. I believe that will simulate my condition very well where I go from 100% used slots to only 1-2 used slots as I hit that timeout. I'll do that today and let you know, if it works I can share the code in here as an example. On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann <[hidden email]> wrote:
|
Actually, random thought, could yarn preemption be causing this? What is the failure scenario should a working task manager go down in yarn that is doing real work? The docs make it sound like it should fire up another TM and get back to work out of the box, but I'm not seeing that. On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton <[hidden email]> wrote:
|
Hi Garrett, have you set a restart strategy for your job [1]? In order to recover from failures you need to specify one. Otherwise Flink will terminally fail the job in case of a failure. Cheers, Till On Thu, Jun 21, 2018 at 7:43 PM Garrett Barton <[hidden email]> wrote:
|
I don't know why yet, but I did figure it out. After my sample long running map reduce test ran fine all night I tried a ton of things. Turns out there is a difference between env.execute() and env.collect(). My flow had reading from HDFS, decrypting, processing, and finally writing to HDFS, at each step though I was splitting the feed and counting stats for saving later. I was executing with collect on the stat feeds unioned together to bring them locally to determine the validity of my run before I did other things. Looks like collect() was causing the disconnections. When I switched to writing the stats out to HDFS files and calling env.execute() the flow works fine now. Oh and thank you for the retry suggestion, I turned it on and watched the job fail 3 times in a row with the same error. So the retry stuff works which is cool, and I'll use it from now on! (Btw, docs need updating here https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/fault_tolerance.html since that's stuffs deprecated!) Thank you all as always for being so responsive! On Fri, Jun 22, 2018 at 5:26 AM Till Rohrmann <[hidden email]> wrote:
|
Great to hear that you could solve your problem Garrett. What happens when you call `collect` is that Flink will send the job which has been defined up to this point to the cluster in order to execute it and it waits until it retrieved the result. Once the result has been obtained, the Flink program execution will continue. In other words, with collect you split your job up into several jobs. Cheers, Till On Fri, Jun 22, 2018 at 9:38 PM Garrett Barton <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |