hello,
I want to make a function for counting items (per type) in windows of size N; For example for N=5 and the stream: 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 I would like to generate the tuples: w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) I am trying to apply my own function with "Window apply", something like: items .windowAll(GlobalWindows.create()) .trigger(CountTrigger.of(5)) .apply(new MyWindowfunction()) but in this case there is a parameters mismatch with apply and WindowFunction, so I am not sure if it is not possible here. any suggestion? Looking at the streaming java examples, the (commented) apply example shown in GroupedProcessingTimeWindowExample() which is applied to a timeWindow, does not work either: .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) .apply(new SummingWindowFunction()) So what I am missing here? any help is appreciated. Regards, Marcela. |
Hello Marcela,
I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right? Could you try this code and is it want you want? Best, Jun ------------------------------------------------- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6}; List<Integer> list = Arrays.asList(array); DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list) .windowAll(GlobalWindows.create()) .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception { HashMap<Integer, Integer> map = new HashMap<>(); for(Integer tuple : tuples){ Integer value = 0; if(map.containsKey(tuple)){ value = map.get(tuple); } map.put(tuple, value+1); } for(Map.Entry<Integer, Integer> entry : map.entrySet()) { out.collect(new Tuple2<>(entry.getKey(), entry.getValue())); } } }); counts.print(); env.execute("Stream WordCount"); On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote: >hello, > >I want to make a function for counting items (per type) in windows of >size N; For example for N=5 and the stream: >1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 > >I would like to generate the tuples: >w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) >w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) >w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) >w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) > >I am trying to apply my own function with "Window apply", something like: > >items >.windowAll(GlobalWindows.create()) >.trigger(CountTrigger.of(5)) >.apply(new MyWindowfunction()) > >but in this case there is a parameters mismatch with apply and >WindowFunction, so I am not sure if it is not possible here. any suggestion? > >Looking at the streaming java examples, the (commented) apply example >shown in GroupedProcessingTimeWindowExample() >which is applied to a timeWindow, does not work either: > >.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) >.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) >.apply(new SummingWindowFunction()) > >So what I am missing here? any help is appreciated. > >Regards, >Marcela. > > > |
Thanks Jun,
Very useful, I was confusing the parameters because my input is tuples, which I might not need in the end... I have now what I wanted (non-parallel and not so efficient I guess, any suggestion to improve is welcome) and I have to modify the trigger so to FIRE_AND_PURGE when it reaches N, the max number of items per window, otherwise it will count the whole data every time... So my example looks like this now: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<String> transactions = env.fromElements( "1 2 4 3 4", "3 4 5 4 6", "7 3 3 6 1", "1 3 2 4 6" ); DataStream<Hashtable<String, Integer>> counts = transactions .flatMap(new LineSplitter()) // because I am expecting one transaction per line .windowAll(GlobalWindows.create()) .trigger(MyCountTrigger.of(5)) .apply(new MyWindowFunction()); counts.print(); env.execute("ItemsCount"); public static class MyWindowFunction implements AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>, GlobalWindow> { public Hashtable<String, Integer> itemsMap = new Hashtable<String, Integer>(); @Override public void apply (GlobalWindow window, Iterable<Tuple2<String,Integer>> tuples, Collector<Hashtable<String, Integer>> out) throws Exception { for(Tuple2<String,Integer> tuple : tuples){ if(itemsMap.containsKey(tuple.f0)){ itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1); } else { itemsMap.put(tuple.f0,1); } } out.collect(itemsMap); } } Regards, Marcela. On 08.03.2016 09:34, Wang Yangjun wrote: > Hello Marcela, > > I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right? > > Could you try this code and is it want you want? > > Best, > Jun > ------------------------------------------------- > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6}; > List<Integer> list = Arrays.asList(array); > DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list) > .windowAll(GlobalWindows.create()) > .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() { > @Override > public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception { > HashMap<Integer, Integer> map = new HashMap<>(); > for(Integer tuple : tuples){ > Integer value = 0; > if(map.containsKey(tuple)){ > value = map.get(tuple); > } > map.put(tuple, value+1); > } > > for(Map.Entry<Integer, Integer> entry : map.entrySet()) { > out.collect(new Tuple2<>(entry.getKey(), entry.getValue())); > } > } > }); > > counts.print(); > > env.execute("Stream WordCount"); > > > > > > On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote: > >> hello, >> >> I want to make a function for counting items (per type) in windows of >> size N; For example for N=5 and the stream: >> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 >> >> I would like to generate the tuples: >> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) >> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) >> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) >> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) >> >> I am trying to apply my own function with "Window apply", something like: >> >> items >> .windowAll(GlobalWindows.create()) >> .trigger(CountTrigger.of(5)) >> .apply(new MyWindowfunction()) >> >> but in this case there is a parameters mismatch with apply and >> WindowFunction, so I am not sure if it is not possible here. any suggestion? >> >> Looking at the streaming java examples, the (commented) apply example >> shown in GroupedProcessingTimeWindowExample() >> which is applied to a timeWindow, does not work either: >> >> .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) >> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) >> .apply(new SummingWindowFunction()) >> >> So what I am missing here? any help is appreciated. >> >> Regards, >> Marcela. >> >> >> -- Dr. Marcela Charfuelan, Senior Researcher TU Berlin, School of Electrical Engineering and Computer Sciences Database Systems and Information Management (DIMA) EN7, Einsteinufer 17, D-10587 Berlin Room: EN 725 Phone: +49 30-314-23556 URL: http://www.user.tu-berlin.de/charfuelan |
Hi,
there is also PurgingTrigger, which turns any Trigger into a trigger that also purges when firing. Use it like this: .trigger(PurgingTrigger.of(CountTrigger.of(5))) Cheers, Aljoscha > On 08 Mar 2016, at 17:23, Marcela Charfuelan <[hidden email]> wrote: > > Thanks Jun, > Very useful, I was confusing the parameters because my input is tuples, which I might not need in the end... > > I have now what I wanted (non-parallel and not so efficient I guess, any suggestion to improve is welcome) and I have to modify the trigger so to FIRE_AND_PURGE when it reaches N, the max number of items per window, otherwise it will count the whole data every time... > > So my example looks like this now: > > StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > DataStream<String> transactions = env.fromElements( > "1 2 4 3 4", > "3 4 5 4 6", > "7 3 3 6 1", > "1 3 2 4 6" > ); > DataStream<Hashtable<String, Integer>> counts = transactions > .flatMap(new LineSplitter()) // because I am expecting one transaction per line > .windowAll(GlobalWindows.create()) > .trigger(MyCountTrigger.of(5)) > .apply(new MyWindowFunction()); > > counts.print(); > env.execute("ItemsCount"); > > > public static class MyWindowFunction implements AllWindowFunction<Tuple2<String,Integer>, Hashtable<String, Integer>, GlobalWindow> { > public Hashtable<String, Integer> itemsMap = new Hashtable<String, Integer>(); > > @Override > public void apply (GlobalWindow window, > Iterable<Tuple2<String,Integer>> tuples, > Collector<Hashtable<String, Integer>> out) throws Exception { > for(Tuple2<String,Integer> tuple : tuples){ > if(itemsMap.containsKey(tuple.f0)){ > itemsMap.put(tuple.f0, itemsMap.get(tuple.f0)+1); > } else { > itemsMap.put(tuple.f0,1); > } > } > out.collect(itemsMap); > } > } > > Regards, > Marcela. > > > > On 08.03.2016 09:34, Wang Yangjun wrote: >> Hello Marcela, >> >> I am not sure what is the “parameters mismatch” here. From the example you shown, it seems that you just want do a window word count. Right? >> >> Could you try this code and is it want you want? >> >> Best, >> Jun >> ------------------------------------------------- >> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(1); >> Integer[] array = new Integer[]{1, 2, 4, 3, 4, 3, 4, 5, 4, 6, 7, 3, 3, 6, 1, 1, 3, 2, 4, 6}; >> List<Integer> list = Arrays.asList(array); >> DataStream<Tuple2<Integer, Integer>> counts = env.fromCollection(list) >> .windowAll(GlobalWindows.create()) >> .trigger(CountTrigger.of(5)).apply(new AllWindowFunction<Integer, Tuple2<Integer, Integer>, GlobalWindow>() { >> @Override >> public void apply(GlobalWindow window, Iterable<Integer> tuples, Collector<Tuple2<Integer, Integer>> out) throws Exception { >> HashMap<Integer, Integer> map = new HashMap<>(); >> for(Integer tuple : tuples){ >> Integer value = 0; >> if(map.containsKey(tuple)){ >> value = map.get(tuple); >> } >> map.put(tuple, value+1); >> } >> >> for(Map.Entry<Integer, Integer> entry : map.entrySet()) { >> out.collect(new Tuple2<>(entry.getKey(), entry.getValue())); >> } >> } >> }); >> >> counts.print(); >> >> env.execute("Stream WordCount"); >> >> >> >> >> >> On 08/03/16 02:57, "Marcela Charfuelan" <[hidden email]> wrote: >> >>> hello, >>> >>> I want to make a function for counting items (per type) in windows of >>> size N; For example for N=5 and the stream: >>> 1 2 4 3 4 3 4 5 4 6 7 3 3 6 1 1 3 2 4 6 >>> >>> I would like to generate the tuples: >>> w(1 2 4 3 4) -> (1,1)(2,1)(4,2)(3,1) >>> w(3 4 5 4 6) -> (1,1)(2,1)(4,4)(3,2)(5,1)(6,1) >>> w(7 3 3 6 1) -> (1,2)(2,1)(4,4)(3,4)(5,1)(6,2)(7,1) >>> w(1 3 2 4 6) -> (1,3)(2,2)(4,5)(3,5)(5,1)(6,3)(7,1) >>> >>> I am trying to apply my own function with "Window apply", something like: >>> >>> items >>> .windowAll(GlobalWindows.create()) >>> .trigger(CountTrigger.of(5)) >>> .apply(new MyWindowfunction()) >>> >>> but in this case there is a parameters mismatch with apply and >>> WindowFunction, so I am not sure if it is not possible here. any suggestion? >>> >>> Looking at the streaming java examples, the (commented) apply example >>> shown in GroupedProcessingTimeWindowExample() >>> which is applied to a timeWindow, does not work either: >>> >>> .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) >>> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) >>> .apply(new SummingWindowFunction()) >>> >>> So what I am missing here? any help is appreciated. >>> >>> Regards, >>> Marcela. >>> >>> >>> > > > -- > Dr. Marcela Charfuelan, Senior Researcher > TU Berlin, School of Electrical Engineering and Computer Sciences > Database Systems and Information Management (DIMA) > EN7, Einsteinufer 17, D-10587 Berlin > Room: EN 725 Phone: +49 30-314-23556 > URL: http://www.user.tu-berlin.de/charfuelan |
Free forum by Nabble | Edit this page |