Flink stops deploying jobs on normal iteration

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink stops deploying jobs on normal iteration

Nguyen Xuan Truong
Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.

I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. 

Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)

Here is the pseudo-code of my program:
 
DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")
     
     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong
Reply | Threaded
Open this post in threaded view
|

Re: Flink stops deploying jobs on normal iteration

Vasiliki Kalavri
Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong <[hidden email]> wrote:
Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.

I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. 

Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)

Here is the pseudo-code of my program:
 
DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")
     
     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong

Reply | Threaded
Open this post in threaded view
|

Re: Flink stops deploying jobs on normal iteration

Nguyen Xuan Truong
Hi Vasia,

Thank you very much for your explanation :). When running with small maxIteration, the job graph that Flink executed was optimal. However, when maxIterations was large, Flink took very long time to generate the job graph. The actually time to execute the jobs was very fast but the time to optimize and schedule the jobs was slow.

Regarding your suggestion, I didn't use iterate/iterateDelta because I need to access the intermediate results within an iteration (the topDistance in my pseudo-code). As you said before, Flink does not support that feature, so I wondered if you have a workround for interate or iterateDelta? 

Thanks,
Truong 

On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong <[hidden email]> wrote:
Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.

I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. 

Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)

Here is the pseudo-code of my program:
 
DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")
     
     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong


Reply | Threaded
Open this post in threaded view
|

Re: Flink stops deploying jobs on normal iteration

Vasiliki Kalavri
Hi Truong,

I guess the problem is that you want to use topDistance as a broadcast set inside the iteration? If I understand correctly this is a dataset with a single value, right? Could you maybe compute it with an aggregator instead?

-Vasia.

On 5 July 2016 at 21:48, Nguyen Xuan Truong <[hidden email]> wrote:
Hi Vasia,

Thank you very much for your explanation :). When running with small maxIteration, the job graph that Flink executed was optimal. However, when maxIterations was large, Flink took very long time to generate the job graph. The actually time to execute the jobs was very fast but the time to optimize and schedule the jobs was slow.

Regarding your suggestion, I didn't use iterate/iterateDelta because I need to access the intermediate results within an iteration (the topDistance in my pseudo-code). As you said before, Flink does not support that feature, so I wondered if you have a workround for interate or iterateDelta? 

Thanks,
Truong 

On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong <[hidden email]> wrote:
Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.

I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. 

Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)

Here is the pseudo-code of my program:
 
DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")
     
     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong



Reply | Threaded
Open this post in threaded view
|

Re: Flink stops deploying jobs on normal iteration

Nguyen Xuan Truong
Hi Vasia,

You are right about the topDistance, it is the dataset which has only 1 double value. I already looked at the Aggregator and I can only get the value of an aggregator in the next iteration. However, my problem is a bit tricky because the topDistance controls how the newSeeds is calculated. I managed to speed up the normal iteration a bit but still curious if there is any workround to use the native flink iteration?

Thanks,
Truong



On Thu, Jul 7, 2016 at 10:17 AM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Truong,

I guess the problem is that you want to use topDistance as a broadcast set inside the iteration? If I understand correctly this is a dataset with a single value, right? Could you maybe compute it with an aggregator instead?

-Vasia.

On 5 July 2016 at 21:48, Nguyen Xuan Truong <[hidden email]> wrote:
Hi Vasia,

Thank you very much for your explanation :). When running with small maxIteration, the job graph that Flink executed was optimal. However, when maxIterations was large, Flink took very long time to generate the job graph. The actually time to execute the jobs was very fast but the time to optimize and schedule the jobs was slow.

Regarding your suggestion, I didn't use iterate/iterateDelta because I need to access the intermediate results within an iteration (the topDistance in my pseudo-code). As you said before, Flink does not support that feature, so I wondered if you have a workround for interate or iterateDelta? 

Thanks,
Truong 

On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong <[hidden email]> wrote:
Hi,

I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.

I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue. 

Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)

Here is the pseudo-code of my program:
 
DataSet[Point] points = env.readTextFile(inputPoints)
DataSet[Point] seeds = env.readTextFile(inputSeeds)
discardNumber: Int = 100
maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)

for(iteration <- 1 to maxIterations) {

      val intermediateSeeds = points
        .map()
        .withBroadcastSet(seeds, "seeds")
     
     //topDistance contains only only double value.
      var topDistance = intermediateSeeds
        .mapPartition()
        .first(discardNumber)
        .groupBy()
        .reduceGroup()

      val newSeeds = intermediateSeeds
        .map()
        .groupBy(0)
        .reduce ().withBroadcastSet(topDistance, "topDistance")
        .map()

      seeds = newSeeds
}

val finalResult = seeds.collect()


Thanks,
Truong