Hi,
We're trying to implement some module to help autoscale our pipeline which is built with Flink on YARN. According to the document, the suggested procedure seems to be: 1. cancel job with savepoint 2. start new job with increased YARN TM number and parallelism. However, step 2 always gave error Caused by: java.lang. at org.apache.flink.runtime. at org.apache.flink.runtime. at org.apache.flink.runtime. at org.apache.flink.runtime. at org.apache.flink.runtime. at scala.concurrent.impl.Future$ at scala.concurrent.impl.Future$ at akka.dispatch.TaskInvocation. at akka.dispatch. at scala.concurrent.forkjoin. at scala.concurrent.forkjoin. at scala.concurrent.forkjoin. at scala.concurrent.forkjoin. The procedure worked fine if parallelism was not changed. Also want to mention that I didn't manually specify OperatorID in my job. The document does mentioned manually OperatorID assignment is suggested, just curious is that mandatory in my case to fix the problem I'm seeing, given that my program doesn't change at all so the autogenerated operatorID should be unchanged after parallelism increase? thanks, Lei |
Hi Lei,
Which version of Flink would that be? I'm guessing >= 1.3.x? In Flink 1.1 the hash of an operator was tied to the parallelism but starting with 1.2 that shouldn't happen anymore. Also, are you changing the parallelism job-wide or are there operators with differing parallelism? For example, could there be a source with parallelism 1 and an operator that had parallelism 1 after that which now has a different parallelism? Best, Aljoscha
|
Hi Aljoscha, I'm using version 1.3.0 and changing job-wide parallelism. Lei On Thu, Oct 19, 2017 at 9:47 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |