Hello all, Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.
My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10
seconds. In other words, my input is a stream of tuples with two fields,
Tuple2<Long, Integer>(timestamp, key), where
key is the non-negative integer value, and
timestamp is used to assign each event to a window. The execution plan I am considering is to have a
first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the
second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.
As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning
policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:
// Phase 0: input setup DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() { @Override public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; } }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1)); On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards,
for the parallel Phase 1, I use hash partitioning by first using
.keyBy() operation on the key of each tuple (i.e., field 1), followed by a
.window() operation, to assign each tuple on a different window, and end with a
.sum(). My code for (parallel) Phase 1 is the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1) DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N); Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:
// Phase 2: serial full aggregation and ordering, with a parallelism of 1 DataStream<String> phaseTwo = phaseOne .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)) .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception { ... List<Integer> topTenValues = ...; StringBuilder strBuilder = new StringBuilder(); for (Integer t : topTenValues) strBuilder.append(Integer.toString(t) + “,”); out.collect(strBuilder.toString()); }); The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the
.window() operation only on a
KeyedStream. Furthermore, the .customPartition() method transforms a
DataStream to a
DataStream (and the same is true for .shuffle() which round-robins events). Therefore,
I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the
.keyBy() method, or define my own
KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the
keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical).
For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of
getKey(), which (as far as I know) is not offered by the
KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore,
my first question is how will I be able to effectively use round-robin grouping with windows on my toy application? The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process
based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence,
I am confused on how I would be able to use partitionCustom() followed by
.window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).
I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.
Kind Regards, Nikos R. Katsipoulakis, Department of Computer Science University of Pittsburgh |
Hi Nikos, Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon). ProcessFunction allows you to register timers which can be used to emit results every 10 seconds. Hope this helps, Fabian 2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:
|
Hello Fabian,
First, I would like to thank you for your suggestion and the additional information on determinism and partition policies. As I mentioned on my initial email, I am new to Flink
and every additional piece of advice makes my “learning curve” less steep. In addition, I am aware that you (and everyone else who follows this thread) might wonder why am I following this unconventional path of performance partitioning, but, I have to inform
you that my use-case’s goal is of academic nature. Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and I read the online documentation on the Process Function API which I found at:
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html . From my understanding, the
process() transformation can be applied only on a
KeyedStream<T> and not on a
DataStream<T>. Therefore, if I wanted to use a custom partition algorithm, I would have to first make a call to
partitionCustom() (DataStream<T> -> DataStream<T>), followed by a
keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation step (i.e., call to
process()). Concretely, my code would turn to something like the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1) DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle() .keyBy(1) .process(new CustomProcessFunction(..., Time.seconds(10),...)) .sum(2).setParallelism(N); Unfortunately, you can understand that the above would be problematic for two reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, because
stream will be (ultimately) partitioned based on the keys and not on my
CustomPartitioner.selectChannels() method. Second, using
process() does not solve my problem, because the issue with my use-case is to avoid calling
keyBy(). If I could do that, then I might as well call
window()and not use the process API in the first place. To be more precise, if I could use a
KeyedStream<T>, then I could do the following:
// Phase 1: parallel partial sum, with a parallelism of N (N > 1) DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream .partitionCustom(new CustomPartitioner(...)) .keyBy(1) .window(TumblingEventTimeWindows.of(Time.seconds(10)) .sum(2).setParallelism(N); Therefore, I don’t think using a Process Function would solve my problem. Am I understanding your suggestion correctly? If yes, I would be grateful if you could explain to me in more detail.
On top of that, after reading my initial email again, I believe that the intentions for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.
Again, thank you very much for your interest and your time.
Kind Regards,
Nikos R. Katsipoulakis,
Department of Computer Science University of Pittsburgh From: Fabian Hueske [mailto:[hidden email]]
Hi Nikos, Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed). Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning
of the input. I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results every 10 seconds. Hope this helps, Fabian 2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:
|
Hi Nikos, you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this advice.This is a fairly low-level interface but gives you access to record timestamps and watermarks. Actually, the DataStream operators are built on this interface as well. 2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:
|
Hello Fabian,
Thank you for your response and there is no need for apologies
J . As I mentioned in my previous email, my wording seemed confusing and it was only expected that you had an incomplete picture
of my goal. Again, thank you for your help and your time. Moving on to my plan from this point on, I understand that I might have to implement some custom components myself (I prefer conducting my research on an actual system over
regressing back to an awful simulation). To that end, I thought of implementing my own
KeyedStream<T> implementation that provides the option of using a different
StreamPartitioner<T> other than the
HashPartitioner<T>. This
CustomKeyedStream<T> will be triggered by a call to a custom method offered by
DataStream<T> (let’s say)
customKeyBy(int... fields, CustomPartitioner<T>) and it will work exactly the same as
DataStream<T>.keyBy(int... fields), but with the only difference that it will receive a custom partitioner instead of using the
default hash partitioner. Do you think that this plan is feasible? I am not completely sure on whether the windowed key state be affected by the design in any way? In addition, I will consider your suggestion on extending the
AbstractStreamOperator and implementing the
OneInputStreamOperator. It looks like an easier way compared to the one I described above and I will try to dive into its implementation
details. Again, thank you very much for your help and your constructive comments. Kind Regards, Nikos R. Katsipoulakis,
Department of Computer Science University of Pittsburgh From: Fabian Hueske [mailto:[hidden email]]
Hi Nikos, you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this advice. The problem is that you need need to implement some kind of time-based function that emits partial counts every 10 seconds. AFAIK, the DataStream API does not offers built-in operator that gives you this except for windows and ProcessFunction. You could try to implement your own operator by extending AbstractStreamOperator and implementing the OneInputStreamOperator interface.
A custom operator is applied by calling dataStream.transform(). Best, Fabian 2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:
|
Hi Nikos, yes, the hash function is not only used for partitioning but also to organize the key-partitioned state. My intuition is that the AbstractStreamOperator approach would be easier to realize, because you don't need to worry about side effects of changing Flink internals. 2017-01-25 18:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:
|
Free forum by Nabble | Edit this page |