Methods that trigger execution

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Methods that trigger execution

Sebastian Neef
Hi,

I've heared of some methods that triggere an execution when using the
Batch API:

- print
- collect
- count
- execute

Some of them are discussed in older docs [0], but I can't find a good
list or hints in the newer ones. Are there any other methods?

Best regards,
Sebastian

[0]
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html
Reply | Threaded
Open this post in threaded view
|

Re: Methods that trigger execution

Aljoscha Krettek
Hi,
Yes you’re right, there is no convenient list. Off the top of my head, your list seems exhaustive. (You could add printToErr()).

As a general remark, I don’t think it’s wise to use these methods when handling large amounts of data because they ship everything back to the client.

Best,
Aljoscha

> On 3. May 2017, at 12:24, Sebastian Neef <[hidden email]> wrote:
>
> Hi,
>
> I've heared of some methods that triggere an execution when using the
> Batch API:
>
> - print
> - collect
> - count
> - execute
>
> Some of them are discussed in older docs [0], but I can't find a good
> list or hints in the newer ones. Are there any other methods?
>
> Best regards,
> Sebastian
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html

Reply | Threaded
Open this post in threaded view
|

Re: Methods that trigger execution

Borja
In reply to this post by Sebastian Neef
Hello,
I just reading about this, because I am developing my degree final project about how performance spark and flink.

I've developed a machine learning algorithm, and I want to trigger the execution in Flink.
When I do it with my code it takes around 5 minutes (all this time just in the collect() method) and Spark 35 seconds,
so I think I'm doing something wrong triggering the execution.

My code is:
// Create multiple linear regression learner
val mlr = MultipleLinearRegression()
.setIterations(10)
.setStepsize(0.3)
.setConvergenceThreshold(0.8)

// Fit the linear model to the provided data
val model = mlr.fit(data)

//Tigger the execution

val weights = mlr.weightsOption match {
  case Some(weights) => weights.collect()
  case None => throw new Exception("Could not calculate the weights.")
}

Is there a better way to trigger the execution?

Thank! :)
Reply | Threaded
Open this post in threaded view
|

Re: Methods that trigger execution

Aljoscha Krettek
Hi,

Using collect() (or print()) on a DataSet is almost never a good idea in Flink because this requires collecting all the data in one central place and sending it to the client. What you normally would do is write the data out to some file system (for example HDFS) and use env.execute() for actually running the program. Simply specifying a program like this:

ExecutionEnv env = …
DataSet<> input = env.read(…)
DataSet<> transformed = input.map(new MyMapFunction())
transformed.write(FileOutputFormat)

Does not actually execute anything, this just builds up an execution graph. Calling env.execute() is what actually ships the graph to a cluster and executes it in parallel.

Best,
Aljoscha

> On 5. Jun 2017, at 20:24, Borja <[hidden email]> wrote:
>
> Hello,
> I just reading about this, because I am developing my degree final project
> about how performance spark and flink.
>
> I've developed a machine learning algorithm, and I want to trigger the
> execution in Flink.
> When I do it with my code it takes around 5 minutes (all this time just in
> the collect() method) and Spark 35 seconds,
> so I think I'm doing something wrong triggering the execution.
>
> My code is:
> // Create multiple linear regression learner
> val mlr = MultipleLinearRegression()
> .setIterations(10)
> .setStepsize(0.3)
> .setConvergenceThreshold(0.8)
>
> // Fit the linear model to the provided data
> val model = mlr.fit(data)
>
> //Tigger the execution
>
> val weights = mlr.weightsOption match {
>  case Some(weights) => weights.collect()
>  case None => throw new Exception("Could not calculate the weights.")
> }
>
> Is there a better way to trigger the execution?
>
> Thank! :)
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Methods-that-trigger-execution-tp12972p13491.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Methods that trigger execution

Borja
Thank so much Aljoscha :)
I was stucked in this point. I didn't know that the print or collect method collecting all the data in one place.

The execution time has dropped a lot.
However, I still get that Flink is slower (just for 7 seconds).

I really think I'm not getting all the performance out of Flink.
Because Flink draws the execution in a cyclic dependency graph meanwhile Spark uses a DAG,
so it's clear that the Flin's way results in superior scalability and performance compared to DAG approach.

So... Which is the problem with my code?

//Read data
val data: DataSet[org.apache.flink.ml.common.LabeledVector] = MLUtils.readLibSVM(benv, "/inputPath/_.libsvm")

// Create multiple linear regression learner
val mlr = MultipleLinearRegression()

val model = mlr.fit(data)

data.writeAsText("file:///outputPath")

benv.execute()
Reply | Threaded
Open this post in threaded view
|

Re: Methods that trigger execution

Aljoscha Krettek
Hi,

I’m afraid I don’t know that part well enough. What’s the percentage in slowdown? (7 seconds alone doesn’t say anything)

Maybe Till (in cc) knows more since he used to work on the ML part.

Best,
Aljoscha

> On 6. Jun 2017, at 17:45, Borja <[hidden email]> wrote:
>
> *Thank so much Aljoscha* :)
> I was stucked in this point. I didn't know that the print or collect method
> collecting all the data in one place.
>
> The execution time has dropped a lot.
> However, I still get that Flink is slower (just for 7 seconds).
>
> I really think I'm not getting all the performance out of Flink.
> Because Flink draws the execution in a cyclic dependency graph meanwhile
> Spark uses a DAG,
> so it's clear that the Flin's way results in superior scalability and
> performance compared to DAG approach.
>
> So... Which is the problem with my code?
>
> //Read data
> val data: DataSet[org.apache.flink.ml.common.LabeledVector] =
> MLUtils.readLibSVM(benv, "/inputPath/_.libsvm")
>
> // Create multiple linear regression learner
> val mlr = MultipleLinearRegression()
>
> val model = mlr.fit(data)
>
> data.writeAsText("file:///outputPath")
>
> benv.execute()
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Methods-that-trigger-execution-tp12972p13537.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.