Strange behavior of DataStream.countWindow

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange behavior of DataStream.countWindow

Yukun Guo

I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Random;

public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.

Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Fabian Hueske-2
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian
 

2016-06-09 13:19 GMT+02:00 Yukun Guo <[hidden email]>:

I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Random;

public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.


Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Yukun Guo
Thx, now I use element.hashCode() % nPartitions and it works as expected.

But I'm afraid it's not a best practice for just turning a plain (already paralellized) DataStream into a KeyedStream? Because it introduces some overhead due to physical repartitioning by key, which is unnecessary since I don't really care about keys.

On 9 June 2016 at 22:00, Fabian Hueske <[hidden email]> wrote:
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian
 

2016-06-09 13:19 GMT+02:00 Yukun Guo <[hidden email]>:

I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Random;

public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.



Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Fabian Hueske-2
If I understood you correctly, you want to compute windows in parallel without using a key.
Are you aware that the results of such a computation is not deterministic and kind of arbitrary?

If that is still OK for you, you can use a mapper to assign the current parallel index as a key field, i.e., wrap the data in a Tuple2<Key, PayLoad> and then do a keyBy(0). This will keep the data local. The mapper should extend RichMapFunction. You can access the parallel index via getRuntimeContext().getParallelSubTaskId().

Hope this helps.
Cheers, Fabian

2016-06-11 11:53 GMT+02:00 Yukun Guo <[hidden email]>:
Thx, now I use element.hashCode() % nPartitions and it works as expected.

But I'm afraid it's not a best practice for just turning a plain (already paralellized) DataStream into a KeyedStream? Because it introduces some overhead due to physical repartitioning by key, which is unnecessary since I don't really care about keys.

On 9 June 2016 at 22:00, Fabian Hueske <[hidden email]> wrote:
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian
 

2016-06-09 13:19 GMT+02:00 Yukun Guo <[hidden email]>:

I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a DataStream consisting of 1000 Strings of random digits, which is windowed with a tumbling count window of 50 elements:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Random;

public class DigitCount {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
                "14159265358979323846264338327950288419716939937510",
                "58209749445923078164062862089986280348253421170679",
                "82148086513282306647093844609550582231725359408128",
                "48111745028410270193852110555964462294895493038196",
                "44288109756659334461284756482337867831652712019091",
                "45648566923460348610454326648213393607260249141273",
                "72458700660631558817488152092096282925409171536436",
                "78925903600113305305488204665213841469519415116094",
                "33057270365759591953092186117381932611793105118548",
                "07446237996274956735188575272489122793818301194912",
                "98336733624406566430860213949463952247371907021798",
                "60943702770539217176293176752384674818467669405132",
                "00056812714526356082778577134275778960917363717872",
                "14684409012249534301465495853710507922796892589235",
                "42019956112129021960864034418159813629774771309960",
                "51870721134999999837297804995105973173281609631859",
                "50244594553469083026425223082533446850352619311881",
                "71010003137838752886587533208381420617177669147303",
                "59825349042875546873115956286388235378759375195778",
                "18577805321712268066130019278766111959092164201989"
        );

        DataStream<Tuple2<Integer, Integer>> digitCount = text
                .flatMap(new Splitter())
                .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, Integer> x) throws Exception {
                        return x.f0 % 2;
                    }
                })
                .countWindow(50)
                .sum(1);

        digitCount.print();
        env.execute();

    }

    public static final class Splitter implements FlatMapFunction<String, Tuple2<Integer, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) {
            for (String token : value.split("")) {
                if (token.length() == 0) {
                    continue;
                }
                out.collect(Tuple2.of(Integer.parseInt(token), 1));
            }
        }
    }
}

The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored.

So far so good, but if I replace the mod KeySelector with a random one:

private static class RandomKeySelector<T> implements KeySelector<T, Integer> {
    private int nPartitions;
    private Random random;

    RandomKeySelector(int nPartitions) {
        this.nPartitions = nPartitions;
        random = new Random();
    }

    @Override
    public Integer getKey(T dummy) throws Exception {
        return random.nextInt(this.nPartitions);
    }
}

and then

.keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))

it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9.

Please help me understand why countWindow behaves like this.




Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Edward
Hi Fabian -
I've tried this idea of creating a KeyedStream based on getRuntimeContext().getIndexOfThisSubtask(). However, not all target subtasks are receiving records.

All subtasks have a parallelism of 12, so I have 12 source subtasks and 12 target subtasks. I've confirmed that the call to getIndexOfThisSubtask is evenly distributed between 0 and 11. However, 4 out of the 12 target subtasks (the subtasks after the hash) are no receiving any data. This means it's not actually keeping all the data local, because at least 4 of the 12 partitions could be getting sent to different TaskManagers.

Do I need to do a .partitionCustom to ensure even/local distribution?

Thanks,
Edward
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Fabian Hueske-2
Flink hashes the keys and computes the target partition using modulo. This works well, if you have many keys but leads to skew if the number of keys is close to the number of partitions.
If you use parittionCustom, you can explicitly define the target partition, however, partitionCustom does not return a KeyedStream, so you cannot use keyed state or windows there.

Not sure if that works for your usecase, but you could try to use more keys to achieve a more uniform key distribution.

Best, Fabian

2017-06-23 15:34 GMT+02:00 Edward <[hidden email]>:
Hi Fabian -
I've tried this idea of creating a KeyedStream based on
getRuntimeContext().getIndexOfThisSubtask(). However, not all target
subtasks are receiving records.

All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
evenly distributed between 0 and 11. However, 4 out of the 12 target
subtasks (the subtasks after the hash) are no receiving any data. This means
it's not actually keeping all the data local, because at least 4 of the 12
partitions could be getting sent to different TaskManagers.

Do I need to do a .partitionCustom to ensure even/local distribution?

Thanks,
Edward



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13971.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Edward
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic multiplier of the subtask index:

      RichMapFunction<String, Tuple2<Integer,String>> keyByMap = new RichMapFunction<String, Tuple2<Integer,String>>() {
              public Tuple2<Integer,String> map(String value) {
                int indexOfCounter = Math.abs(value.hashCode()) % 4;
                int key = (( getRuntimeContext().getIndexOfThisSubtask() + 1)  * (indexOfCounter + 1)) - 1;
                counters.get(key).add(1);
                return new Tuple2<>(key, value);
            }
        };

With this idea, if there are 12 subtasks, then subtask 0 would create 4 keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is this still true with my example here (where I'm applying a function to the subtask index)? That is, if a each partition is generating a unique set of keys (unique to that subtask), will it optimize to keep that set of keys local for the next downstream subtask?
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Fabian Hueske-2
No, you will lose data locality if you use keyBy(), which is the only way to obtain a KeyedStream.

2017-06-23 17:52 GMT+02:00 Edward <[hidden email]>:
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:

      RichMapFunction<String, Tuple2<Integer,String>> keyByMap = new
RichMapFunction<String, Tuple2<Integer,String>>() {
              public Tuple2<Integer,String> map(String value) {
                int indexOfCounter = Math.abs(value.hashCode()) % 4;
                int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1)  * (indexOfCounter + 1)) - 1;
                counters.get(key).add(1);
                return new Tuple2<>(key, value);
            }
        };

With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Edward
So there is no way to do a countWindow(100) and preserve data locality?

My use case is this: augment a data stream with new fields from DynamoDB lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to collect 100 records before making that call. I have no other reason to do a repartitioning, so I am hoping to avoid incurring the cost of shipping all the data across the network to do this.

If I use countWindowAll, I am limited to parallelism = 1, so all data gets repartitioned twice. And if I use keyBy().countWindow(), then it gets repartitioned by key. So in both cases I lose locality.

Am I missing any other options?
Reply | Threaded
Open this post in threaded view
|

Re: Strange behavior of DataStream.countWindow

Fabian Hueske-2
If the data does not have a key (or you do not care about it) you can also use a FlatMapFunction (or ProcessFunction) with Operator State. Operator State is not bound to a key but to a parallel operator instance. Have a look at the ListCheckpointed interface and its JavaDocs.

2017-06-23 18:27 GMT+02:00 Edward <[hidden email]>:
So there is no way to do a countWindow(100) and preserve data locality?

My use case is this: augment a data stream with new fields from DynamoDB
lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
collect 100 records before making that call. I have no other reason to do a
repartitioning, so I am hoping to avoid incurring the cost of shipping all
the data across the network to do this.

If I use countWindowAll, I am limited to parallelism = 1, so all data gets
repartitioned twice. And if I use keyBy().countWindow(), then it gets
repartitioned by key. So in both cases I lose locality.

Am I missing any other options?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13981.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.