Flink programm with for loop yields wrong results when run in parallel

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink programm with for loop yields wrong results when run in parallel

Adrian Bartnik
Hi,

I have a Flink programm, which outputs wrong results once I set the parallelism to a value larger that 1.
If I run the programm with parallelism 1, everything works fine.

The algorithm works on one input dataset, which will iteratively be split until the desired output split size is reached.
The way how to split the cluster in each iteration is also determined iteratively.

Pseudocode:

val input = DataSet

for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until desired #splits was reached
    // Iteratively compute best split
    Dataset determinedSplit = Iteration involving input

    // Split dataset to 2 smaller ones
    val tmpDataSet1 = determinedSplit.filter(x ==1) ...
    val tmpDataSet2 = determinedSplit.filter(x ==0) ...

    tmpDataSet1.count() // These are necessary, to store the size of each split
    tmpDataSet2.count()

    // Store tmpDataSet1 and 2 as they are needed in one of the next loop executions (as dataset to be split)
    ...

}

In all comes down to 2 nested loops, one of which can be replaced by a iteration.
As nested iterations are not supported yet, I do not know how to avoid the outer loop.

Is this a know problem, and if yes, what would be a solution?

Best,
Adrian
Reply | Threaded
Open this post in threaded view
|

Re: Flink programm with for loop yields wrong results when run in parallel

Ufuk Celebi
Nested iterations are not supported via a "native iteration" operator.
There is no way to avoid the for loop at the moment.

I think it's not possible to tell why the results are wrong from the
code snippet. How do you propagate the counts back? In general I
expect this program to perform very badly for larger data sets,
because there is no support for caching intermediate results yet.



On Mon, Jul 4, 2016 at 11:56 AM, Adrian Bartnik
<[hidden email]> wrote:

> Hi,
>
> I have a Flink programm, which outputs wrong results once I set the
> parallelism to a value larger that 1.
> If I run the programm with parallelism 1, everything works fine.
>
> The algorithm works on one input dataset, which will iteratively be split
> until the desired output split size is reached.
> The way how to split the cluster in each iteration is also determined
> iteratively.
>
> Pseudocode:
>
> val input = DataSet
>
> for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until
> desired #splits was reached
>     // Iteratively compute best split
>     Dataset determinedSplit = Iteration involving input
>
>     // Split dataset to 2 smaller ones
>     val tmpDataSet1 = determinedSplit.filter(x ==1) ...
>     val tmpDataSet2 = determinedSplit.filter(x ==0) ...
>
>     tmpDataSet1.count() // These are necessary, to store the size of each
> split
>     tmpDataSet2.count()
>
>     // Store tmpDataSet1 and 2 as they are needed in one of the next loop
> executions (as dataset to be split)
>     ...
>
> }
>
> In all comes down to 2 nested loops, one of which can be replaced by a
> iteration.
> As nested iterations are not supported yet, I do not know how to avoid the
> outer loop.
>
> Is this a know problem, and if yes, what would be a solution?
>
> Best,
> Adrian
Reply | Threaded
Open this post in threaded view
|

Re: Flink programm with for loop yields wrong results when run in parallel

Flavio Pompermaier

I also have a lot of use case where caching a dataset would be definitely useful...maybe using Auxillio (the new name of Tachyon) and write intermediate results to an in-memory fs could be an better than re-read over and over tge input source for the moment...What do you think?

On 4 Jul 2016 12:25 p.m., "Ufuk Celebi" <[hidden email]> wrote:
Nested iterations are not supported via a "native iteration" operator.
There is no way to avoid the for loop at the moment.

I think it's not possible to tell why the results are wrong from the
code snippet. How do you propagate the counts back? In general I
expect this program to perform very badly for larger data sets,
because there is no support for caching intermediate results yet.



On Mon, Jul 4, 2016 at 11:56 AM, Adrian Bartnik
<[hidden email]> wrote:
> Hi,
>
> I have a Flink programm, which outputs wrong results once I set the
> parallelism to a value larger that 1.
> If I run the programm with parallelism 1, everything works fine.
>
> The algorithm works on one input dataset, which will iteratively be split
> until the desired output split size is reached.
> The way how to split the cluster in each iteration is also determined
> iteratively.
>
> Pseudocode:
>
> val input = DataSet
>
> for (currentSplitNumber <- 1 to numberOfSplits) { // Split dataset until
> desired #splits was reached
>     // Iteratively compute best split
>     Dataset determinedSplit = Iteration involving input
>
>     // Split dataset to 2 smaller ones
>     val tmpDataSet1 = determinedSplit.filter(x ==1) ...
>     val tmpDataSet2 = determinedSplit.filter(x ==0) ...
>
>     tmpDataSet1.count() // These are necessary, to store the size of each
> split
>     tmpDataSet2.count()
>
>     // Store tmpDataSet1 and 2 as they are needed in one of the next loop
> executions (as dataset to be split)
>     ...
>
> }
>
> In all comes down to 2 nested loops, one of which can be replaced by a
> iteration.
> As nested iterations are not supported yet, I do not know how to avoid the
> outer loop.
>
> Is this a know problem, and if yes, what would be a solution?
>
> Best,
> Adrian