Hi ,
I am working on a use case where i have a stream of events. I want to attach a unique id to all the events happened in a session. Below is the logis that i am trying to implement. - 1. session_started 2 whenevr a event_name=search generate a unique search_id and attch this id to all the following events in session until a new "search" event encountered in session. Example : user-1. session-1 event_name- search (generate searchid --1) user-1. session-1 event_name- x (attach above search id -1) user-1. session-1 event_name- y (attach above search id -1) user-1. session-1 event_name- y (attach above search id -1) user-1. session-1 event_name- search (generate searchid --2) user-1. session-1 event_name- x (attach above search id -2) user-1. session-1 event_name- y (attach above search id -2) user-1. session-1 event_name- y (attach above search id -2) As events can come out of order so i want to do this after session window got over. So after session window i am doing like this : 1. sort all the events by time. 2. iterate ech event and attach the search_id 3. collect all th events and generate another stream with enrich search_id. I am trying with below code but its not working as expected . i am not able to understand what is happening. dataStream.keyBy((KeySelector<GenericRecord, String>) record -> { StringBuilder builder = new StringBuilder(); builder.append(record.get("session_id")); builder.append(record.get("user_id")); return builder.toString(); }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .process(new ProcessWindowFunction<GenericRecord, GenericRecord, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) throws Exception { Stream<GenericRecord> result = IterableUtils.toStream(iterable); List<GenericRecord> s = result.collect(Collectors.toList()); Map<Long,GenericRecord> recordMap = new HashMap<>(); for(GenericRecord record : s) { recordMap.put((long)record.get("event_ts"),record); } Map<Long,GenericRecord> sortedRecordMap = new LinkedHashMap<>(); recordMap.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue())); String search_id = null; for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) { GenericRecord record = element.getValue(); if(record.get("event_name").equals("search")) { search_id = UUID.randomUUID().toString(); } record.put("search_id",search_id); collector.collect(record); } } }).print(); |
Hi, Is using the session window to implement the above logic is good idea or i should use process function. On Sun, Mar 1, 2020 at 11:39 AM aj <[hidden email]> wrote:
|
Please help me to implement the above logic. On Mon, Mar 2, 2020 at 4:47 PM aj <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |