Custom Partitioning and windowing questions/concerns

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom Partitioning and windowing questions/concerns

Katsipoulakis, Nikolaos Romanos

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

Reply | Threaded
Open this post in threaded view
|

Re: Custom Partitioning and windowing questions/concerns

Fabian Hueske-2
Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).
Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning of the input.

I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).
ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.

Hope this helps,
Fabian



2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 


Reply | Threaded
Open this post in threaded view
|

RE: Custom Partitioning and windowing questions/concerns

Katsipoulakis, Nikolaos Romanos

Hello Fabian,

 

First, I would like to thank you for your suggestion and the additional information on determinism and partition policies. As I mentioned on my initial email, I am new to Flink and every additional piece of advice makes my “learning curve” less steep. In addition, I am aware that you (and everyone else who follows this thread) might wonder why am I following this unconventional path of performance partitioning, but, I have to inform you that my use-case’s goal is of academic nature.

 

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and I read the online documentation on the Process Function API which I found at: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html . From my understanding, the process() transformation can be applied only on a KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use a custom partition algorithm, I would have to first make a call to partitionCustom() (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation step (i.e., call to process()). Concretely, my code would turn to something like the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle()

                .keyBy(1)

                .process(new CustomProcessFunction(..., Time.seconds(10),...))

                .sum(2).setParallelism(N);

 

Unfortunately, you can understand that the above would be problematic for two reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, because stream will be (ultimately) partitioned based on the keys and not on my CustomPartitioner.selectChannels() method. Second, using process() does not solve my problem, because the issue with my use-case is to avoid calling keyBy(). If I could do that, then I might as well call window()and not use the process API in the first place. To be more precise, if I could use a KeyedStream<T>, then I could do the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...))

                .keyBy(1)

                .window(TumblingEventTimeWindows.of(Time.seconds(10))

                .sum(2).setParallelism(N);

 

Therefore, I don’t think using a Process Function would solve my problem. Am I understanding your suggestion correctly? If yes, I would be grateful if you could explain to me in more detail. On top of that, after reading my initial email again, I believe that the intentions for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.

 

Again, thank you very much for your interest and your time.

 

Kind Regards,

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 5:15 AM
To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).

Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning of the input.

I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).

ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.

Hope this helps,

Fabian

 

 

2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Custom Partitioning and windowing questions/concerns

Fabian Hueske-2
Hi Nikos,

you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this advice.
The problem is that you need need to implement some kind of time-based function that emits partial counts every 10 seconds.
AFAIK, the DataStream API does not offers built-in operator that gives you this except for windows and ProcessFunction.

You could try to implement your own operator by extending AbstractStreamOperator and implementing the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps and watermarks. Actually, the DataStream operators are built on this interface as well.
A custom operator is applied by calling dataStream.transform().

Best,
Fabian



2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello Fabian,

 

First, I would like to thank you for your suggestion and the additional information on determinism and partition policies. As I mentioned on my initial email, I am new to Flink and every additional piece of advice makes my “learning curve” less steep. In addition, I am aware that you (and everyone else who follows this thread) might wonder why am I following this unconventional path of performance partitioning, but, I have to inform you that my use-case’s goal is of academic nature.

 

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and I read the online documentation on the Process Function API which I found at: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html . From my understanding, the process() transformation can be applied only on a KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use a custom partition algorithm, I would have to first make a call to partitionCustom() (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation step (i.e., call to process()). Concretely, my code would turn to something like the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle()

                .keyBy(1)

                .process(new CustomProcessFunction(..., Time.seconds(10),...))

                .sum(2).setParallelism(N);

 

Unfortunately, you can understand that the above would be problematic for two reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, because stream will be (ultimately) partitioned based on the keys and not on my CustomPartitioner.selectChannels() method. Second, using process() does not solve my problem, because the issue with my use-case is to avoid calling keyBy(). If I could do that, then I might as well call window()and not use the process API in the first place. To be more precise, if I could use a KeyedStream<T>, then I could do the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...))

                .keyBy(1)

                .window(TumblingEventTimeWindows.of(Time.seconds(10))

                .sum(2).setParallelism(N);

 

Therefore, I don’t think using a Process Function would solve my problem. Am I understanding your suggestion correctly? If yes, I would be grateful if you could explain to me in more detail. On top of that, after reading my initial email again, I believe that the intentions for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.

 

Again, thank you very much for your interest and your time.

 

Kind Regards,

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 5:15 AM
To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).

Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning of the input.

I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).

ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.

Hope this helps,

Fabian

 

 

2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

 


Reply | Threaded
Open this post in threaded view
|

RE: Custom Partitioning and windowing questions/concerns

Katsipoulakis, Nikolaos Romanos

Hello Fabian,

 

Thank you for your response and there is no need for apologies J . As I mentioned in my previous email, my wording seemed confusing and it was only expected that you had an incomplete picture of my goal. Again, thank you for your help and your time.

 

Moving on to my plan from this point on, I understand that I might have to implement some custom components myself (I prefer conducting my research on an actual system over regressing back to an awful simulation). To that end, I thought of implementing my own KeyedStream<T> implementation that provides the option of using a different StreamPartitioner<T> other than the HashPartitioner<T>. This CustomKeyedStream<T> will be triggered by a call to a custom method offered by DataStream<T> (let’s say) customKeyBy(int... fields, CustomPartitioner<T>) and it will work exactly the same as DataStream<T>.keyBy(int... fields), but with the only difference that it will receive a custom partitioner instead of using the default hash partitioner. Do you think that this plan is feasible? I am not completely sure on whether the windowed key state be affected by the design in any way?

 

In addition, I will consider your suggestion on extending the AbstractStreamOperator and implementing the OneInputStreamOperator. It looks like an easier way compared to the one I described above and I will try to dive into its implementation details.

 

Again, thank you very much for your help and your constructive comments.

 

Kind Regards,   

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Wednesday, January 25, 2017 12:28 PM
To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this advice.

The problem is that you need need to implement some kind of time-based function that emits partial counts every 10 seconds.

AFAIK, the DataStream API does not offers built-in operator that gives you this except for windows and ProcessFunction.

You could try to implement your own operator by extending AbstractStreamOperator and implementing the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps and watermarks. Actually, the DataStream operators are built on this interface as well.

A custom operator is applied by calling dataStream.transform().

Best,

Fabian

 

 

 

2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello Fabian,

 

First, I would like to thank you for your suggestion and the additional information on determinism and partition policies. As I mentioned on my initial email, I am new to Flink and every additional piece of advice makes my “learning curve” less steep. In addition, I am aware that you (and everyone else who follows this thread) might wonder why am I following this unconventional path of performance partitioning, but, I have to inform you that my use-case’s goal is of academic nature.

 

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and I read the online documentation on the Process Function API which I found at: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html . From my understanding, the process() transformation can be applied only on a KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use a custom partition algorithm, I would have to first make a call to partitionCustom() (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation step (i.e., call to process()). Concretely, my code would turn to something like the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle()

                .keyBy(1)

                .process(new CustomProcessFunction(..., Time.seconds(10),...))

                .sum(2).setParallelism(N);

 

Unfortunately, you can understand that the above would be problematic for two reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, because stream will be (ultimately) partitioned based on the keys and not on my CustomPartitioner.selectChannels() method. Second, using process() does not solve my problem, because the issue with my use-case is to avoid calling keyBy(). If I could do that, then I might as well call window()and not use the process API in the first place. To be more precise, if I could use a KeyedStream<T>, then I could do the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...))

                .keyBy(1)

                .window(TumblingEventTimeWindows.of(Time.seconds(10))

                .sum(2).setParallelism(N);

 

Therefore, I don’t think using a Process Function would solve my problem. Am I understanding your suggestion correctly? If yes, I would be grateful if you could explain to me in more detail. On top of that, after reading my initial email again, I believe that the intentions for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.

 

Again, thank you very much for your interest and your time.

 

Kind Regards,

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 5:15 AM
To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).

Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning of the input.

I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).

ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.

Hope this helps,

Fabian

 

 

2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Custom Partitioning and windowing questions/concerns

Fabian Hueske-2
Hi Nikos,

yes, the hash function is not only used for partitioning but also to organize the key-partitioned state.
My intuition is that the AbstractStreamOperator approach would be easier to realize, because you don't need to worry about side effects of changing Flink internals.

Best, Fabian

2017-01-25 18:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello Fabian,

 

Thank you for your response and there is no need for apologies J . As I mentioned in my previous email, my wording seemed confusing and it was only expected that you had an incomplete picture of my goal. Again, thank you for your help and your time.

 

Moving on to my plan from this point on, I understand that I might have to implement some custom components myself (I prefer conducting my research on an actual system over regressing back to an awful simulation). To that end, I thought of implementing my own KeyedStream<T> implementation that provides the option of using a different StreamPartitioner<T> other than the HashPartitioner<T>. This CustomKeyedStream<T> will be triggered by a call to a custom method offered by DataStream<T> (let’s say) customKeyBy(int... fields, CustomPartitioner<T>) and it will work exactly the same as DataStream<T>.keyBy(int... fields), but with the only difference that it will receive a custom partitioner instead of using the default hash partitioner. Do you think that this plan is feasible? I am not completely sure on whether the windowed key state be affected by the design in any way?

 

In addition, I will consider your suggestion on extending the AbstractStreamOperator and implementing the OneInputStreamOperator. It looks like an easier way compared to the one I described above and I will try to dive into its implementation details.

 

Again, thank you very much for your help and your constructive comments.

 

Kind Regards,   

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Wednesday, January 25, 2017 12:28 PM


To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

you are of course right. I forgot that ProcessFunction requires a KeyedStream. Sorry for this advice.

The problem is that you need need to implement some kind of time-based function that emits partial counts every 10 seconds.

AFAIK, the DataStream API does not offers built-in operator that gives you this except for windows and ProcessFunction.

You could try to implement your own operator by extending AbstractStreamOperator and implementing the OneInputStreamOperator interface.
This is a fairly low-level interface but gives you access to record timestamps and watermarks. Actually, the DataStream operators are built on this interface as well.

A custom operator is applied by calling dataStream.transform().

Best,

Fabian

 

 

 

2017-01-24 17:18 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello Fabian,

 

First, I would like to thank you for your suggestion and the additional information on determinism and partition policies. As I mentioned on my initial email, I am new to Flink and every additional piece of advice makes my “learning curve” less steep. In addition, I am aware that you (and everyone else who follows this thread) might wonder why am I following this unconventional path of performance partitioning, but, I have to inform you that my use-case’s goal is of academic nature.

 

Turning to your suggestion, I took some time and go over version’s 1.2-SNAPSHOT code, and I read the online documentation on the Process Function API which I found at: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/process_function.html . From my understanding, the process() transformation can be applied only on a KeyedStream<T> and not on a DataStream<T>. Therefore, if I wanted to use a custom partition algorithm, I would have to first make a call to partitionCustom() (DataStream<T> -> DataStream<T>), followed by a keyBy(…) (DataStream<T> -> KeyedStream<T>), and finally apply my first pre-aggregation step (i.e., call to process()). Concretely, my code would turn to something like the following:

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...)) // or .rebalance() or .shuffle()

                .keyBy(1)

                .process(new CustomProcessFunction(..., Time.seconds(10),...))

                .sum(2).setParallelism(N);

 

Unfortunately, you can understand that the above would be problematic for two reasons: First, a call to keyBy() defeats the purpose of a custom partitioner, because stream will be (ultimately) partitioned based on the keys and not on my CustomPartitioner.selectChannels() method. Second, using process() does not solve my problem, because the issue with my use-case is to avoid calling keyBy(). If I could do that, then I might as well call window()and not use the process API in the first place. To be more precise, if I could use a KeyedStream<T>, then I could do the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream

                .partitionCustom(new CustomPartitioner(...))

                .keyBy(1)

                .window(TumblingEventTimeWindows.of(Time.seconds(10))

                .sum(2).setParallelism(N);

 

Therefore, I don’t think using a Process Function would solve my problem. Am I understanding your suggestion correctly? If yes, I would be grateful if you could explain to me in more detail. On top of that, after reading my initial email again, I believe that the intentions for my use-case were not quite clear. Please, do not hesitate to ask me for any clarifications.

 

Again, thank you very much for your interest and your time.

 

Kind Regards,

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: Tuesday, January 24, 2017 5:15 AM
To: [hidden email]
Subject: Re: Custom Partitioning and windowing questions/concerns

 

Hi Nikos,

Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed).

Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result of the pre-aggregation (phase 1) should not deterministic because it would depend on the partitioning of the input.

I would suggest to implement the pre-aggregation not with a window but with a ProcessFunction (available in Flink 1.2-SNAPSHOT which will be release soon).

ProcessFunction allows you to register timers which can be used to emit results every 10 seconds.

Hope this helps,

Fabian

 

 

2017-01-23 17:50 GMT+01:00 Katsipoulakis, Nikolaos Romanos <[hidden email]>:

Hello all,

 

Currently, I examine the effects of stream partitioning on performance for simple state-full scenarios.

 

My toy application for the rest of my question will be the following: A stream of non-negative integers, each one annotated with a timestamp, and the goal is to get the top-10 most frequent non-negative integers on tumbling windows of 10 seconds. In other words, my input is a stream of tuples with two fields, Tuple2<Long, Integer>(timestamp, key), where key is the non-negative integer value, and timestamp is used to assign each event to a window. The execution plan I am considering is to have a first phase (Phase 1), where the stream is partitioned and the partial aggregations are processed in parallel (set parallelism to N > 1). Afterwards, the second phase (Phase 2) involves gathering all partial aggregations on a single node (set parallelism to 1), and calculate the full aggregation for each key, order the keys based on windowed frequency and outputs the top-10 keys for each window.

 

As I mentioned earlier, my goal is to compare the performance of different partitioning policies on this toy application. Initially, I want to compare shuffle-grouping (round-robin) and hash-grouping and then move on to different partitioning policies by using Flink’s CustomPartitioner API. After reading Flink’s documentation, I managed to develop the toy application using hash-partitioning. Below, I present the different parts of my code:

 

// Phase 0: input setup

DataStream<Tuple3<Long, Integer, Integer>> stream = env.fromCollection(…)

               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, Integer>>() {

                   @Override

                    public long extractAscendingTimestamp(Tuple2<Long, Integer> event) { return event.f0; }

                }).map( (Tuple2<Long, Integer> e) -> new Tuple3<Long, Integer, Integer>(e.f0, e.f1, 1));

 

On Phase 0, I collect the input stream, from an in-memory list, define the event timestamp which will be used for windowing, and extend each event with a value of 1 for calculating the appearance of each number on every window. Afterwards, for the parallel Phase 1, I use hash partitioning by first using .keyBy() operation on the key of each tuple (i.e., field 1), followed by a .window() operation, to assign each tuple on a different window, and end with a .sum(). My code for (parallel) Phase 1 is the following:

 

// Phase 1: parallel partial sum, with a parallelism of N (N > 1)

DataStream<Tuple3<Long, Integer, Integer> phaseOne = stream.keyBy(1).window(TumblingEventTimeWindows.of(Time.seconds(10)).sum(2).setParallelism(N);

 

Moving on to Phase 2, to aggregate all partial results of a single window in one operator for producing the full aggregation, ordering based on frequency, and return the top-10 keys, I have the following:

 

// Phase 2: serial full aggregation and ordering, with a parallelism of 1

DataStream<String> phaseTwo = phaseOne

                .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))

                .apply(new AllWindowsFunction<Tuple3<Long, Integer, Integer>, String, TimeWindow>() {

                    @Override

                    public void apply(TimeWindow window, Iterable<Tuple3<Long, Integer, Integer>> values, Collector<String> out) throws Exception {

                        ...

                        List<Integer> topTenValues = ...;

                        StringBuilder strBuilder = new StringBuilder();

                        for (Integer t : topTenValues)

                            strBuilder.append(Integer.toString(t) + “,”);

                        out.collect(strBuilder.toString());

                    });

 

The previous code makes use of hash-partitioning for its parallel phase. From what I understand, Flink allows the .window() operation only on a KeyedStream. Furthermore, the .customPartition() method transforms a DataStream to a DataStream (and the same is true for .shuffle() which round-robins events). Therefore, I am confused on how I can use a shuffle policy with windows. One Idea that came to me is to provide an irrelevant field on the .keyBy() method, or define my own KeySelector<IN, KEY> that will simulate shuffle grouping through key generation. Unfortunately, I have two concerns regarding the previous alternatives: For the keyBy() approach, I need to control the internal hashing mechanisms, which entails cherry-picking fields on different workloads and performing an exhaustive search on the behavior of different random fields (not practical). For the KeySelector<IN, KEY>approach, I need to maintain state among different calls of getKey(), which (as far as I know) is not offered by the KeySelector<IN, KEY> interface and I do not want to rely on external state that will lead to additional overhead. Therefore, my first question is how will I be able to effectively use round-robin grouping with windows on my toy application?

 

The bigger point I am trying to address revolves around custom partitioning policies and windows in general. My understanding is that the benefit of a custom partitioning policy is to have the ability to control the partitioning process based on a pre-defined set of resources (e.g., partitions, task slots etc.). Hence, I am confused on how I would be able to use partitionCustom() followed by .window() on the (parallel) phase one, to test the performance of different execution plans (i.e., partitioning policies).

 

I apologize for the long question, but I believe that I had to provide enough details for the points/questions I currently have (highlighted with bold). Thank you very much for your time.

 

Kind Regards,  

 

Nikos R. Katsipoulakis,

Department of Computer Science

University of Pittsburgh