Hi Truong,I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :)Until then, I suggest you try implementing your logic using iterate or iterateDelta.Cheers,-Vasia.On 5 July 2016 at 17:11, Nguyen Xuan Truong <[hidden email]> wrote:Hi,I have a Flink program which is similar to Kmeans algorithm. I use normal iteration(for loop) because Flink iteration does not allow to compute the intermediate results(in this case the topDistance) within one iteration. The problem is that my program only runs when maxIteration is small. When the maxIterations is big, Flink jobs inside the forloop are not scheduled, deployed or executed. The program hangs forever without any exception, error or log message.I ran the program on both local and cluster environments, having the same issue. I tried with smaller inputs (points and seeds), having the same issue.Does anybody have an idea about what is the problem? (Maybe the forloop creates many Flink jobs?)Here is the pseudo-code of my program:DataSet[Point] points = env.readTextFile(inputPoints)DataSet[Point] seeds = env.readTextFile(inputSeeds)discardNumber: Int = 100maxIterations: Int = 20 // maxIteration = 30 will hang the program and no Flink job inside the forloop jobs is deployed)for(iteration <- 1 to maxIterations) {val intermediateSeeds = points.map().withBroadcastSet(seeds, "seeds")//topDistance contains only only double value.var topDistance = intermediateSeeds.mapPartition().first(discardNumber).groupBy().reduceGroup()val newSeeds = intermediateSeeds.map().groupBy(0).reduce ().withBroadcastSet(topDistance, "topDistance").map()seeds = newSeeds}val finalResult = seeds.collect()Thanks,Truong
Free forum by Nabble | Edit this page |