Window apply problem

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Window apply problem

Marcela Charfuelan
hello,

I want to make a function for counting items (per type) in windows of
size N; For example for N=5 and the stream:
1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6

I would like to generate the tuples:
w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)

I am trying to apply my own function with "Window apply", something like:

items
.windowAll(GlobalWindows.create())
.trigger(CountTrigger.of(5))
.apply(new MyWindowfunction())

but in this case there is a parameters mismatch with apply and
WindowFunction, so I am not sure if it is not possible here. any suggestion?

Looking at the streaming java examples, the (commented) apply example
shown in GroupedProcessingTimeWindowExample()
which is applied to a timeWindow, does not work either:

.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.apply(new SummingWindowFunction())

So what I am missing here? any help is appreciated.

Regards,
Marcela.



Reply | Threaded
Open this post in threaded view
|

Re: Window apply problem

Wang Yangjun
Hello Marcela,

I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right?

Could you try this code and is it want you want?

Best,
Jun
-------------------------------------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
List<Integer> list = Arrays.asList(array);
DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
            @Override
            public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
                HashMap<Integer, Integer> map = new HashMap<>();
                for(Integer tuple : tuples){
                    Integer value = 0;
                    if(map.containsKey(tuple)){
                        value = map.get(tuple);
                    }
                    map.put(tuple, value+1);
                }

                for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
                    out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
                }
            }
        });

counts.print();

env.execute("Stream WordCount");





On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote:

>hello,
>
>I want to make a function for counting items (per type) in windows of
>size N; For example for N=5 and the stream:
>1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>
>I would like to generate the tuples:
>w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>
>I am trying to apply my own function with "Window apply", something like:
>
>items
>.windowAll(GlobalWindows.create())
>.trigger(CountTrigger.of(5))
>.apply(new MyWindowfunction())
>
>but in this case there is a parameters mismatch with apply and
>WindowFunction, so I am not sure if it is not possible here. any suggestion?
>
>Looking at the streaming java examples, the (commented) apply example
>shown in GroupedProcessingTimeWindowExample()
>which is applied to a timeWindow, does not work either:
>
>.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
>.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>.apply(new SummingWindowFunction())
>
>So what I am missing here? any help is appreciated.
>
>Regards,
>Marcela.
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Window apply problem

Marcela Charfuelan
Thanks Jun,
Very useful, I was confusing the parameters because my input is tuples,
which I might not need in the end...

I have now what I wanted (non-parallel and not so efficient I guess, any
suggestion to improve is welcome) and I have to modify the trigger so to
FIRE_AND_PURGE when it reaches N, the max number of items per window,
otherwise it will count the whole data every time...

So my example looks like this now:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> transactions = env.fromElements(
                        "1 2 4 3 4",
                        "3 4 5 4 6",
                        "7 3 3 6 1",
                        "1 3 2 4 6"
);
DataStream<Hashtable<String, Integer>> counts = transactions
                .flatMap(new LineSplitter())  // because I am expecting one
transaction per line
                .windowAll(GlobalWindows.create())
                .trigger(MyCountTrigger.of(5))
                .apply(new MyWindowFunction());

counts.print();
env.execute("ItemsCount");


public static class MyWindowFunction implements
AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>,
GlobalWindow> {
                public Hashtable<String, Integer> itemsMap = new Hashtable<String,
Integer>();
               
      @Override
            public void apply (GlobalWindow window,
                         Iterable<Tuple2<String,Integer>> tuples,
                         Collector<Hashtable<String, Integer>> out) throws
Exception {
                 for(Tuple2<String,Integer> tuple : tuples){
                  if(itemsMap.containsKey(tuple.f0)){
                  itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
                  } else {
                  itemsMap.put(tuple.f0,1);
                  }
              }
              out.collect(itemsMap);
                 }
                }

Regards,
Marcela.



On 08.03.2016 09:34, Wang Yangjun wrote:

> Hello Marcela,
>
> I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right?
>
> Could you try this code and is it want you want?
>
> Best,
> Jun
> -------------------------------------------------
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
> List<Integer> list = Arrays.asList(array);
> DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
>          .windowAll(GlobalWindows.create())
>          .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
>              @Override
>              public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
>                  HashMap<Integer, Integer> map = new HashMap<>();
>                  for(Integer tuple : tuples){
>                      Integer value = 0;
>                      if(map.containsKey(tuple)){
>                          value = map.get(tuple);
>                      }
>                      map.put(tuple, value+1);
>                  }
>
>                  for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
>                      out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
>                  }
>              }
>          });
>
> counts.print();
>
> env.execute("Stream WordCount");
>
>
>
>
>
> On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote:
>
>> hello,
>>
>> I want to make a function for counting items (per type) in windows of
>> size N; For example for N=5 and the stream:
>> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>>
>> I would like to generate the tuples:
>> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>>
>> I am trying to apply my own function with "Window apply", something like:
>>
>> items
>> .windowAll(GlobalWindows.create())
>> .trigger(CountTrigger.of(5))
>> .apply(new MyWindowfunction())
>>
>> but in this case there is a parameters mismatch with apply and
>> WindowFunction, so I am not sure if it is not possible here. any suggestion?
>>
>> Looking at the streaming java examples, the (commented) apply example
>> shown in GroupedProcessingTimeWindowExample()
>> which is applied to a timeWindow, does not work either:
>>
>> .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
>> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>> .apply(new SummingWindowFunction())
>>
>> So what I am missing here? any help is appreciated.
>>
>> Regards,
>> Marcela.
>>
>>
>>


--
Dr. Marcela Charfuelan, Senior Researcher
TU Berlin, School of Electrical Engineering and Computer Sciences
Database Systems and Information Management (DIMA)
EN7, Einsteinufer 17, D-10587 Berlin
Room: EN 725  Phone: +49 30-314-23556
URL: http://www.user.tu-berlin.de/charfuelan
Reply | Threaded
Open this post in threaded view
|

Re: Window apply problem

Aljoscha Krettek
Hi,
there is also PurgingTrigger, which turns any Trigger into a trigger that also purges when firing. Use it like this:

.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Cheers,
Aljoscha

> On 08 Mar 2016, at 17:23, Marcela Charfuelan <[hidden email]> wrote:
>
> Thanks Jun,
> Very useful, I was confusing the parameters because my input is tuples, which I might not need in the end...
>
> I have now what I wanted (non-parallel and not so efficient I guess, any suggestion to improve is welcome) and I have to modify the trigger so to FIRE_AND_PURGE when it reaches N, the max number of items per window, otherwise it will count the whole data every time...
>
> So my example looks like this now:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataStream<String> transactions = env.fromElements(
> "1 2 4 3 4",
> "3 4 5 4 6",
> "7 3 3 6 1",
> "1 3 2 4 6"
> );
> DataStream<Hashtable<String, Integer>> counts = transactions
> .flatMap(new LineSplitter())  // because I am expecting one transaction per line
> .windowAll(GlobalWindows.create())
>        .trigger(MyCountTrigger.of(5))
>        .apply(new MyWindowFunction());
>
> counts.print();
> env.execute("ItemsCount");
>
>
> public static class MyWindowFunction implements AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>, GlobalWindow> {
> public Hashtable<String, Integer> itemsMap = new Hashtable<String, Integer>();
>
>     @Override
>    public void apply (GlobalWindow window,
>                  Iterable<Tuple2<String,Integer>> tuples,
>                  Collector<Hashtable<String, Integer>> out) throws Exception {
>         for(Tuple2<String,Integer> tuple : tuples){
>                 if(itemsMap.containsKey(tuple.f0)){
>                 itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1);
>                 } else {
>                 itemsMap.put(tuple.f0,1);
>                 }
>             }
>             out.collect(itemsMap);
> }
> }
>
> Regards,
> Marcela.
>
>
>
> On 08.03.2016 09:34, Wang Yangjun wrote:
>> Hello Marcela,
>>
>> I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right?
>>
>> Could you try this code and is it want you want?
>>
>> Best,
>> Jun
>> -------------------------------------------------
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setParallelism(1);
>> Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6};
>> List<Integer> list = Arrays.asList(array);
>> DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list)
>>         .windowAll(GlobalWindows.create())
>>         .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() {
>>             @Override
>>             public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception {
>>                 HashMap<Integer, Integer> map = new HashMap<>();
>>                 for(Integer tuple : tuples){
>>                     Integer value = 0;
>>                     if(map.containsKey(tuple)){
>>                         value = map.get(tuple);
>>                     }
>>                     map.put(tuple, value+1);
>>                 }
>>
>>                 for(Map.Entry<Integer, Integer> entry : map.entrySet()) {
>>                     out.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
>>                 }
>>             }
>>         });
>>
>> counts.print();
>>
>> env.execute("Stream WordCount");
>>
>>
>>
>>
>>
>> On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote:
>>
>>> hello,
>>>
>>> I want to make a function for counting items (per type) in windows of
>>> size N; For example for N=5 and the stream:
>>> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6
>>>
>>> I would like to generate the tuples:
>>> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1)
>>> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1)
>>> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1)
>>> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1)
>>>
>>> I am trying to apply my own function with "Window apply", something like:
>>>
>>> items
>>> .windowAll(GlobalWindows.create())
>>> .trigger(CountTrigger.of(5))
>>> .apply(new MyWindowfunction())
>>>
>>> but in this case there is a parameters mismatch with apply and
>>> WindowFunction, so I am not sure if it is not possible here. any suggestion?
>>>
>>> Looking at the streaming java examples, the (commented) apply example
>>> shown in GroupedProcessingTimeWindowExample()
>>> which is applied to a timeWindow, does not work either:
>>>
>>> .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
>>> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>>> .apply(new SummingWindowFunction())
>>>
>>> So what I am missing here? any help is appreciated.
>>>
>>> Regards,
>>> Marcela.
>>>
>>>
>>>
>
>
> --
> Dr. Marcela Charfuelan, Senior Researcher
> TU Berlin, School of Electrical Engineering and Computer Sciences
> Database Systems and Information Management (DIMA)
> EN7, Einsteinufer 17, D-10587 Berlin
> Room: EN 725  Phone: +49 30-314-23556
> URL: http://www.user.tu-berlin.de/charfuelan