Dataset.distinct - Question on deterministic results

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Dataset.distinct - Question on deterministic results

Will Bastian
I'm operating on a data set with some challenges to overcome. They are:
  1. There is possibility for multiple entries for a single key
    and
  2. For a single key, there may be multiple unique value-tuples
For example
key, val1, val2, val3
1,      0,    0,    0
1,      0,    0,    0
1,      1,    0,    0
2,      1,    1,    1
2,      1,    1,    1
2,      1,    1,    0
1,      0,    0,    0

I've found when executing mySet.distinct(_.key) on the above, that my final results suggest distinct isn't always pulling the same record/value-tuple on every run.

Fully understanding that the use of distinct I've outlined above isn't optimal (we don't know, or care which value-tuple we get, we just want it to be consistent on each run), I wanted to validate whether what I believe I'm observing is accurate. Specifically, in this example is Flink reducing by key with no concern for value, and we can expect the possibility that we may pull different instances back on each distinct call?

Thanks,
Will
Reply | Threaded
Open this post in threaded view
|

Re: Dataset.distinct - Question on deterministic results

Fabian Hueske-2
Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all (non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian <[hidden email]>:
I'm operating on a data set with some challenges to overcome. They are:
  1. There is possibility for multiple entries for a single key
    and
  2. For a single key, there may be multiple unique value-tuples
For example
key, val1, val2, val3
1,      0,    0,    0
1,      0,    0,    0
1,      1,    0,    0
2,      1,    1,    1
2,      1,    1,    1
2,      1,    1,    0
1,      0,    0,    0

I've found when executing mySet.distinct(_.key) on the above, that my final results suggest distinct isn't always pulling the same record/value-tuple on every run.

Fully understanding that the use of distinct I've outlined above isn't optimal (we don't know, or care which value-tuple we get, we just want it to be consistent on each run), I wanted to validate whether what I believe I'm observing is accurate. Specifically, in this example is Flink reducing by key with no concern for value, and we can expect the possibility that we may pull different instances back on each distinct call?

Thanks,
Will

Reply | Threaded
Open this post in threaded view
|

Re: Dataset.distinct - Question on deterministic results

Will Bastian
Fabian,
Thanks for the clear response. You addressed my question, and the suggestions provide clear context on how to address.

Best,
Will


On Fri, Aug 10, 2018 at 5:52 AM Fabian Hueske <[hidden email]> wrote:
Hi Will,

The distinct operator is implemented as a groupBy(distinctKeys) and a ReduceFunction that returns the first argument.
Hence, it depends on the order in which the records are processed by the ReduceFunction.

Flink does not maintain a deterministic order because it is quite expensive in distributed systems.
There are a few aspects that result in random order:
- lazy split assignment
- combiners (which are automatically added for ReduceFunctions)
- network shuffles

There are two ways to address this issue:
1) Fully sort the input of the combiners and reducers on all attributes.
2) Use a custom ReduceFunction that compares both input records on all (non-distinct-key) fields to determine which record to return.

I would go for the second approach because it is more efficient (no need to fully sort before the combiner).

Best, Fabian

2018-08-09 18:12 GMT+02:00 Will Bastian <[hidden email]>:
I'm operating on a data set with some challenges to overcome. They are:
  1. There is possibility for multiple entries for a single key
    and
  2. For a single key, there may be multiple unique value-tuples
For example
key, val1, val2, val3
1,      0,    0,    0
1,      0,    0,    0
1,      1,    0,    0
2,      1,    1,    1
2,      1,    1,    1
2,      1,    1,    0
1,      0,    0,    0

I've found when executing mySet.distinct(_.key) on the above, that my final results suggest distinct isn't always pulling the same record/value-tuple on every run.

Fully understanding that the use of distinct I've outlined above isn't optimal (we don't know, or care which value-tuple we get, we just want it to be consistent on each run), I wanted to validate whether what I believe I'm observing is accurate. Specifically, in this example is Flink reducing by key with no concern for value, and we can expect the possibility that we may pull different instances back on each distinct call?

Thanks,
Will