Count Different Codes in a Window

Posted by Raj Kumar on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Count-Different-Codes-in-a-Window-tp14391.html

Hi,

we have a requirement where we need to aggregate the data every 10mins and write ONCE the aggregated results to the elastic search. Right now, we are iterating over the iterable to make a count of different status codes to do this. Is there a better way to count different status codes.

public void apply(TimeWindow timeWindow, Iterable<Tuple4<String, Long, String, String>> iterable, Collector<Tuple4<String, Long, String, String>> collector) throws Exception {

            long[] counts=new long[10];
            Arrays.fill(counts,0l);

            //count different type of records in a window
            for (Tuple4<String, Long, String, String> in : iterable) {
                counts[0]++;
                if (in.f2!=null && in.f2.startsWith("5"))
                    counts[1]++;
                else if (in.f2!=null && in.f2.startsWith("4"))
                    counts[2]++;
                else if (in.f2!=null && in.f2.startsWith("2"))
                    counts[3]++;

                if(in.f3!=null && in.f3.equalsIgnoreCase("GET"))
                    counts[4]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("POST"))
                    counts[5]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("PUT"))
                    counts[6]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("HEAD"))
                    counts[7]++;

            }
...
}