I’m playing with the (Window)WordCount example from Flink QuickStart. I generate a
The code above will produce 19 lines of output which is reasonable as the 1000 digits will be keyed into 2 partitions where one partition contains 500+ elements and the other contains slightly fewer than 500 elements, therefore as a result one 50-digit window is ignored. So far so good, but if I replace the mod KeySelector with a random one:
and then
it may generate 17 or 18 lines of output. How could that happen? Moreover, if I set the number of partitions to 10, in theory the lines of output should be no fewer than 11, but actually it can be only 9. Please help me understand why |
Hi Yukun, the problem is that the KeySelector is internally invoked multiple times. Hence it must be deterministic, i.e., it must extract the same key for the same object if invoked multiple times. The documentation is not discussing this aspect and should be extended. Thanks for pointing out this issue. Cheers, Fabian 2016-06-09 13:19 GMT+02:00 Yukun Guo <[hidden email]>:
|
Thx, now I use element.hashCode() % nPartitions and it works as expected. But I'm afraid it's not a best practice for just turning a plain (already paralellized) DataStream into a KeyedStream? Because it introduces some overhead due to physical repartitioning by key, which is unnecessary since I don't really care about keys.On 9 June 2016 at 22:00, Fabian Hueske <[hidden email]> wrote:
|
If I understood you correctly, you want to compute windows in parallel without using a key. Are you aware that the results of such a computation is not deterministic and kind of arbitrary?2016-06-11 11:53 GMT+02:00 Yukun Guo <[hidden email]>:
|
Hi Fabian -
I've tried this idea of creating a KeyedStream based on getRuntimeContext().getIndexOfThisSubtask(). However, not all target subtasks are receiving records. All subtasks have a parallelism of 12, so I have 12 source subtasks and 12 target subtasks. I've confirmed that the call to getIndexOfThisSubtask is evenly distributed between 0 and 11. However, 4 out of the 12 target subtasks (the subtasks after the hash) are no receiving any data. This means it's not actually keeping all the data local, because at least 4 of the 12 partitions could be getting sent to different TaskManagers. Do I need to do a .partitionCustom to ensure even/local distribution? Thanks, Edward |
Flink hashes the keys and computes the target partition using modulo. This works well, if you have many keys but leads to skew if the number of keys is close to the number of partitions. If you use parittionCustom, you can explicitly define the target partition, however, partitionCustom does not return a KeyedStream, so you cannot use keyed state or windows there. Not sure if that works for your usecase, but you could try to use more keys to achieve a more uniform key distribution. Best, Fabian 2017-06-23 15:34 GMT+02:00 Edward <[hidden email]>: Hi Fabian - |
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic multiplier of the subtask index: RichMapFunction<String, Tuple2<Integer,String>> keyByMap = new RichMapFunction<String, Tuple2<Integer,String>>() { public Tuple2<Integer,String> map(String value) { int indexOfCounter = Math.abs(value.hashCode()) % 4; int key = (( getRuntimeContext().getIndexOfThisSubtask() + 1) * (indexOfCounter + 1)) - 1; counters.get(key).add(1); return new Tuple2<>(key, value); } }; With this idea, if there are 12 subtasks, then subtask 0 would create 4 keys: 0, 12, 24, and 36. The big advantage of your idea was that it would keep the data local. Is this still true with my example here (where I'm applying a function to the subtask index)? That is, if a each partition is generating a unique set of keys (unique to that subtask), will it optimize to keep that set of keys local for the next downstream subtask? |
No, you will lose data locality if you use keyBy(), which is the only way to obtain a KeyedStream. 2017-06-23 17:52 GMT+02:00 Edward <[hidden email]>: Thanks, Fabian. |
So there is no way to do a countWindow(100) and preserve data locality?
My use case is this: augment a data stream with new fields from DynamoDB lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to collect 100 records before making that call. I have no other reason to do a repartitioning, so I am hoping to avoid incurring the cost of shipping all the data across the network to do this. If I use countWindowAll, I am limited to parallelism = 1, so all data gets repartitioned twice. And if I use keyBy().countWindow(), then it gets repartitioned by key. So in both cases I lose locality. Am I missing any other options? |
If the data does not have a key (or you do not care about it) you can also use a FlatMapFunction (or ProcessFunction) with Operator State. Operator State is not bound to a key but to a parallel operator instance. Have a look at the ListCheckpointed interface and its JavaDocs. 2017-06-23 18:27 GMT+02:00 Edward <[hidden email]>: So there is no way to do a countWindow(100) and preserve data locality? |
Free forum by Nabble | Edit this page |