How to sessionize stream with Apache Flink?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to sessionize stream with Apache Flink?

Milad khajavi
Hi guys,
I want to sessionize this stream: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ... to these sessions:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

I've wrote CustomTrigger to detect when stream elements change from 1 to 2 (2 to 3, 3 to 0 and so on) and then fire the trigger. But this is not the solution, because when I processing the first element of 2's, and fire the trigger the window will be [1,1,1,2] but I need to fire the trigger on the last element of 1's. 


Here is the pesudo of my onElement function in my custom trigger class:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

How can I solve this problem?

--
Milād Khājavi
http://blog.khajavi.ir
Having the source means you can do it yourself.
I tried to change the world, but I couldn’t find the source code.
Reply | Threaded
Open this post in threaded view
|

Re: How to sessionize stream with Apache Flink?

Jonas Gröger
Hey Milad,

since you cannot look into the future which element comes next, you have to "lag" one behind. This requires building an operator that creates 2-tuples from incoming elements containing (current-1, current), so basically a single value state that emits the last and the current element in a tuple.

In a trigger, the element is then of the 2-tuple type and you can see changes "beforehand". The last element of 1's is then (1, 2).

Hope this helps.
Reply | Threaded
Open this post in threaded view
|

Re: How to sessionize stream with Apache Flink?

Fabian Hueske-2
An alternative would be to use a FlatMapFunction with a ListState instead of a window with custom trigger.

When a new element arrives (i.e., the flatMap() method is called), you check if the value changed.
If the value did not changed, you append the element to the state.
If the value changed, you emit the current list state as a session, clear the list, and insert the new element as the first to the list state.

However, you should keep in mind that this assumes that the order of elements is preserved.
Flink ensures within a partition, i.e, as long as elements are not shuffled and all operators run with the same parallelism.

Best, Fabian


2017-06-18 15:10 GMT+02:00 Jonas <[hidden email]>:
Hey Milad,

since you cannot look into the future which element comes next, you have to
"lag" one behind. This requires building an operator that creates 2-tuples
from incoming elements containing (current-1, current), so basically a
single value state that emits the last and the current element in a tuple.

In a trigger, the element is then of the 2-tuple type and you can see
changes "beforehand". The last element of 1's is then (1, 2).

Hope this helps.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-sessionize-stream-with-Apache-Flink-tp13817p13818.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.