data sink stops method

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

data sink stops method

Florian Heyl
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={

var iter = 0

val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
}

iter = 0

val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}

val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}

val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

val N = list_JoinPredictionAndOriginal.length

val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum

val predictionError = sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

(predictions,predictionError)
}




 
Reply | Threaded
Open this post in threaded view
|

Re: data sink stops method

Pieter Hameete
Hi Florian,

I believe that when you call JoinPredictionAndOriginal.collect the environment will execute your program up until that point. The Csv writes are after this point, so in order to execute these steps I think you would have to call <env>.execute() after the Csv writes to trigger the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).

I hope this helps :-)

- Pieter

2015-10-08 14:54 GMT+02:00 Florian Heyl <[hidden email]>:
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={

var iter = 0

val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
}

iter = 0

val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}

val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}

val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

val N = list_JoinPredictionAndOriginal.length

val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum

val predictionError = sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

(predictions,predictionError)
}




 

Reply | Threaded
Open this post in threaded view
|

Re: data sink stops method

Stephan Ewen
Yes, sinks in Flink are lazy and do not trigger execution automatically. We made this choice to allow multiple concurrent sinks (spitting the streams and writing to many outputs concurrently). That requires explicit execution triggers (env.execute()).

The exceptions are, as mentioned, the "eager" methods "collect()", "count()" and "print()". They need to be eager, because the driver program needs for example the "count()" value before it can possibly progress...

Stephan


On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <[hidden email]> wrote:
Hi Florian,

I believe that when you call JoinPredictionAndOriginal.collect the environment will execute your program up until that point. The Csv writes are after this point, so in order to execute these steps I think you would have to call <env>.execute() after the Csv writes to trigger the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).

I hope this helps :-)

- Pieter

2015-10-08 14:54 GMT+02:00 Florian Heyl <[hidden email]>:
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={

var iter = 0

val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
}

iter = 0

val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}

val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}

val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

val N = list_JoinPredictionAndOriginal.length

val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum

val predictionError = sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

(predictions,predictionError)
}




 


Reply | Threaded
Open this post in threaded view
|

Re: data sink stops method

Florian Heyl
Hey Stephan and Pieter,
That was the same what I thought, so I simply changed the code like this:

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)

env.execute()

transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

env.execute()
But he still not execute the two commands.
Thank you for your time.

Flo


Am 08.10.2015 um 17:41 schrieb Stephan Ewen <[hidden email]>:

Yes, sinks in Flink are lazy and do not trigger execution automatically. We made this choice to allow multiple concurrent sinks (spitting the streams and writing to many outputs concurrently). That requires explicit execution triggers (env.execute()).

The exceptions are, as mentioned, the "eager" methods "collect()", "count()" and "print()". They need to be eager, because the driver program needs for example the "count()" value before it can possibly progress...

Stephan


On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <[hidden email]> wrote:
Hi Florian,

I believe that when you call JoinPredictionAndOriginal.collect the environment will execute your program up until that point. The Csv writes are after this point, so in order to execute these steps I think you would have to call <env>.execute() after the Csv writes to trigger the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).

I hope this helps :-)

- Pieter

2015-10-08 14:54 GMT+02:00 Florian Heyl <[hidden email]>:
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={

var iter = 0

val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
}

iter = 0

val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}

val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}

val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

val N = list_JoinPredictionAndOriginal.length

val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum

val predictionError = sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

(predictions,predictionError)
}




 



Reply | Threaded
Open this post in threaded view
|

Re: data sink stops method

Till Rohrmann

Could you post a minimal example of your code where the problem is reproducible? I assume that there has to be another problem because env.execute should actually trigger the execution.

Cheers,

Till


On Thu, Oct 8, 2015 at 8:58 PM, Florian Heyl <[hidden email]> wrote:
Hey Stephan and Pieter,
That was the same what I thought, so I simply changed the code like this:

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)

env.execute()

transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

env.execute()
But he still not execute the two commands.
Thank you for your time.

Flo


Am 08.10.2015 um 17:41 schrieb Stephan Ewen <[hidden email]>:

Yes, sinks in Flink are lazy and do not trigger execution automatically. We made this choice to allow multiple concurrent sinks (spitting the streams and writing to many outputs concurrently). That requires explicit execution triggers (env.execute()).

The exceptions are, as mentioned, the "eager" methods "collect()", "count()" and "print()". They need to be eager, because the driver program needs for example the "count()" value before it can possibly progress...

Stephan


On Thu, Oct 8, 2015 at 5:22 PM, Pieter Hameete <[hidden email]> wrote:
Hi Florian,

I believe that when you call JoinPredictionAndOriginal.collect the environment will execute your program up until that point. The Csv writes are after this point, so in order to execute these steps I think you would have to call <env>.execute() after the Csv writes to trigger the execution (where <env> is the name of the variable pointing to your ExecutionEnvironment).

I hope this helps :-)

- Pieter

2015-10-08 14:54 GMT+02:00 Florian Heyl <[hidden email]>:
Hi,
I need some help to figure out why one method of mine in a pipeline stops the execution on the hdfs.
I am working with the 10.0-SNAPSHOT and the code is the following (see below). The method stops on the hdfs by calling the collect method (JoinPredictionAndOriginal.collect) creating a data sink, which is why the program stops before the two output files at the ends can be created. What am I missing?
Thank you for your time.

Best wishes,
Flo

// method calculates the prediction error
def CalcPredError(predictions: DataSet[LabeledVector], original: DataSet[LabeledVector],
outputPath: String, outputPath2: String, outputPath3: String): (DataSet[LabeledVector], Double) ={

var iter = 0

val transformPred = predictions
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(BigDecimal(tuple.label).setScale(0, BigDecimal.RoundingMode.HALF_UP).toDouble))
}

iter = 0

val tranformOrg = original
.map { tuple =>
iter = iter + 1
LabeledVector(iter, DenseVector(tuple.label))
}

val JoinPredictionAndOriginal = transformPred.join(tranformOrg).where(0).equalTo(0) {
(l, r) => (l.vector.head._2, r.vector.head._2)
}

val list_JoinPredictionAndOriginal = JoinPredictionAndOriginal.collect

val N = list_JoinPredictionAndOriginal.length

val residualSum = list_JoinPredictionAndOriginal.map {
num => pow((num._1 - num._2), 2)
}.sum

val predictionError = sqrt(residualSum / N)

original.writeAsCsv(outputPath, "\n", " ", WriteMode.OVERWRITE)
transformPred.writeAsCsv(outputPath2, "\n", " ", WriteMode.OVERWRITE)

(predictions,predictionError)
}