Hi,
I've heared of some methods that triggere an execution when using the Batch API: - collect - count - execute Some of them are discussed in older docs [0], but I can't find a good list or hints in the newer ones. Are there any other methods? Best regards, Sebastian [0] https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html |
Hi,
Yes you’re right, there is no convenient list. Off the top of my head, your list seems exhaustive. (You could add printToErr()). As a general remark, I don’t think it’s wise to use these methods when handling large amounts of data because they ship everything back to the client. Best, Aljoscha > On 3. May 2017, at 12:24, Sebastian Neef <[hidden email]> wrote: > > Hi, > > I've heared of some methods that triggere an execution when using the > Batch API: > > - collect > - count > - execute > > Some of them are discussed in older docs [0], but I can't find a good > list or hints in the newer ones. Are there any other methods? > > Best regards, > Sebastian > > [0] > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html |
In reply to this post by Sebastian Neef
Hello,
I just reading about this, because I am developing my degree final project about how performance spark and flink. I've developed a machine learning algorithm, and I want to trigger the execution in Flink. When I do it with my code it takes around 5 minutes (all this time just in the collect() method) and Spark 35 seconds, so I think I'm doing something wrong triggering the execution. My code is: // Create multiple linear regression learner val mlr = MultipleLinearRegression() .setIterations(10) .setStepsize(0.3) .setConvergenceThreshold(0.8) // Fit the linear model to the provided data val model = mlr.fit(data) //Tigger the execution val weights = mlr.weightsOption match { case Some(weights) => weights.collect() case None => throw new Exception("Could not calculate the weights.") } Is there a better way to trigger the execution? Thank! :) |
Hi,
Using collect() (or print()) on a DataSet is almost never a good idea in Flink because this requires collecting all the data in one central place and sending it to the client. What you normally would do is write the data out to some file system (for example HDFS) and use env.execute() for actually running the program. Simply specifying a program like this: ExecutionEnv env = … DataSet<> input = env.read(…) DataSet<> transformed = input.map(new MyMapFunction()) transformed.write(FileOutputFormat) Does not actually execute anything, this just builds up an execution graph. Calling env.execute() is what actually ships the graph to a cluster and executes it in parallel. Best, Aljoscha > On 5. Jun 2017, at 20:24, Borja <[hidden email]> wrote: > > Hello, > I just reading about this, because I am developing my degree final project > about how performance spark and flink. > > I've developed a machine learning algorithm, and I want to trigger the > execution in Flink. > When I do it with my code it takes around 5 minutes (all this time just in > the collect() method) and Spark 35 seconds, > so I think I'm doing something wrong triggering the execution. > > My code is: > // Create multiple linear regression learner > val mlr = MultipleLinearRegression() > .setIterations(10) > .setStepsize(0.3) > .setConvergenceThreshold(0.8) > > // Fit the linear model to the provided data > val model = mlr.fit(data) > > //Tigger the execution > > val weights = mlr.weightsOption match { > case Some(weights) => weights.collect() > case None => throw new Exception("Could not calculate the weights.") > } > > Is there a better way to trigger the execution? > > Thank! :) > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Methods-that-trigger-execution-tp12972p13491.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Thank so much Aljoscha :)
I was stucked in this point. I didn't know that the print or collect method collecting all the data in one place. The execution time has dropped a lot. However, I still get that Flink is slower (just for 7 seconds). I really think I'm not getting all the performance out of Flink. Because Flink draws the execution in a cyclic dependency graph meanwhile Spark uses a DAG, so it's clear that the Flin's way results in superior scalability and performance compared to DAG approach. So... Which is the problem with my code? //Read data val data: DataSet[org.apache.flink.ml.common.LabeledVector] = MLUtils.readLibSVM(benv, "/inputPath/_.libsvm") // Create multiple linear regression learner val mlr = MultipleLinearRegression() val model = mlr.fit(data) data.writeAsText("file:///outputPath") benv.execute() |
Hi,
I’m afraid I don’t know that part well enough. What’s the percentage in slowdown? (7 seconds alone doesn’t say anything) Maybe Till (in cc) knows more since he used to work on the ML part. Best, Aljoscha > On 6. Jun 2017, at 17:45, Borja <[hidden email]> wrote: > > *Thank so much Aljoscha* :) > I was stucked in this point. I didn't know that the print or collect method > collecting all the data in one place. > > The execution time has dropped a lot. > However, I still get that Flink is slower (just for 7 seconds). > > I really think I'm not getting all the performance out of Flink. > Because Flink draws the execution in a cyclic dependency graph meanwhile > Spark uses a DAG, > so it's clear that the Flin's way results in superior scalability and > performance compared to DAG approach. > > So... Which is the problem with my code? > > //Read data > val data: DataSet[org.apache.flink.ml.common.LabeledVector] = > MLUtils.readLibSVM(benv, "/inputPath/_.libsvm") > > // Create multiple linear regression learner > val mlr = MultipleLinearRegression() > > val model = mlr.fit(data) > > data.writeAsText("file:///outputPath") > > benv.execute() > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Methods-that-trigger-execution-tp12972p13537.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |