Flink Session Window to enrich Event with unique id

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Session Window to enrich Event with unique id

anuj.aj07
Hi ,

I am working on a use case where i have a stream of events. I want to attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started 
2 whenevr a event_name=search generate a unique search_id and attch this id to all the following events in session until a new "search" event encountered in session.

Example :
user-1.  session-1   event_name- search (generate searchid --1)
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- search (generate searchid --2)
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able to understand what is happening.

dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
                    .process(new ProcessWindowFunction<GenericRecord, GenericRecord, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) throws Exception {
                            Stream<GenericRecord> result = IterableUtils.toStream(iterable);
                            List<GenericRecord> s = result.collect(Collectors.toList());
                            Map<Long,GenericRecord> recordMap = new HashMap<>();
                            for(GenericRecord record : s) {
                                    recordMap.put((long)record.get("event_ts"),record);
                            }
                            Map<Long,GenericRecord> sortedRecordMap = new LinkedHashMap<>();
                            recordMap.entrySet().stream()
                                    .sorted(Map.Entry.comparingByKey())
                                    .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));

                            String search_id = null;
                            for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
                                GenericRecord record = element.getValue();
                                if(record.get("event_name").equals("search")) {
                                    search_id = UUID.randomUUID().toString();
                                }
                                record.put("search_id",search_id);
                                collector.collect(record);
                            }
                        }
                    }).print();


--
Thanks & Regards,
Anuj Jain


Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window to enrich Event with unique id

anuj.aj07
Hi,
Is using the session window to implement the above logic is good idea or i should use process function.

On Sun, Mar 1, 2020 at 11:39 AM aj <[hidden email]> wrote:
Hi ,

I am working on a use case where i have a stream of events. I want to attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started 
2 whenevr a event_name=search generate a unique search_id and attch this id to all the following events in session until a new "search" event encountered in session.

Example :
user-1.  session-1   event_name- search (generate searchid --1)
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- search (generate searchid --2)
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able to understand what is happening.

dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
                    .process(new ProcessWindowFunction<GenericRecord, GenericRecord, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) throws Exception {
                            Stream<GenericRecord> result = IterableUtils.toStream(iterable);
                            List<GenericRecord> s = result.collect(Collectors.toList());
                            Map<Long,GenericRecord> recordMap = new HashMap<>();
                            for(GenericRecord record : s) {
                                    recordMap.put((long)record.get("event_ts"),record);
                            }
                            Map<Long,GenericRecord> sortedRecordMap = new LinkedHashMap<>();
                            recordMap.entrySet().stream()
                                    .sorted(Map.Entry.comparingByKey())
                                    .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));

                            String search_id = null;
                            for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
                                GenericRecord record = element.getValue();
                                if(record.get("event_name").equals("search")) {
                                    search_id = UUID.randomUUID().toString();
                                }
                                record.put("search_id",search_id);
                                collector.collect(record);
                            }
                        }
                    }).print();


--
Thanks & Regards,
Anuj Jain




--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Session Window to enrich Event with unique id

anuj.aj07
Please help me to implement the above logic. 

On Mon, Mar 2, 2020 at 4:47 PM aj <[hidden email]> wrote:
Hi,
Is using the session window to implement the above logic is good idea or i should use process function.

On Sun, Mar 1, 2020 at 11:39 AM aj <[hidden email]> wrote:
Hi ,

I am working on a use case where i have a stream of events. I want to attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started 
2 whenevr a event_name=search generate a unique search_id and attch this id to all the following events in session until a new "search" event encountered in session.

Example :
user-1.  session-1   event_name- search (generate searchid --1)
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- search (generate searchid --2)
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able to understand what is happening.

dataStream.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
                    .process(new ProcessWindowFunction<GenericRecord, GenericRecord, String, TimeWindow>() {
                        @Override
                        public void process(String key, Context context, Iterable<GenericRecord> iterable, Collector<GenericRecord> collector) throws Exception {
                            Stream<GenericRecord> result = IterableUtils.toStream(iterable);
                            List<GenericRecord> s = result.collect(Collectors.toList());
                            Map<Long,GenericRecord> recordMap = new HashMap<>();
                            for(GenericRecord record : s) {
                                    recordMap.put((long)record.get("event_ts"),record);
                            }
                            Map<Long,GenericRecord> sortedRecordMap = new LinkedHashMap<>();
                            recordMap.entrySet().stream()
                                    .sorted(Map.Entry.comparingByKey())
                                    .forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));

                            String search_id = null;
                            for(Map.Entry<Long,GenericRecord> element :sortedRecordMap.entrySet()) {
                                GenericRecord record = element.getValue();
                                if(record.get("event_name").equals("search")) {
                                    search_id = UUID.randomUUID().toString();
                                }
                                record.put("search_id",search_id);
                                collector.collect(record);
                            }
                        }
                    }).print();


--
Thanks & Regards,
Anuj Jain




--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07