Posted by
Hamilton, Paul on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Session-Based-Windows-tp3137p3213.html
Hi Aljoscha,
Thanks a lot for your Trigger implementation, definitely helped provide
some direction. It appears to be working well for our use case. One
thing I have noticed now that I have pulled the state API changes in is
that key based state within a window function does not appear to be
working. Perhaps I am not using it correctly now that the API has
changed. Previously we had done something like this within the
RichWindowFunction:
@Override
public void open(final Configuration parameters) throws Exception {
state = getRuntimeContext().getOperatorState("state", new StatePojo(),
true);
}
Based on the API changes I switched it to:
@Override
public void open(final Configuration parameters) throws Exception {
state = getRuntimeContext().getKeyValueState("state", StatePojo.class,
new StatePojo());
}
But the state doesn’t seem to be partitioned based on the key. I haven’t
had much time to play around with it, so its certainly possible that I
messed something up while refactoring to the API change. I will look at
it further when I get a chance, but if you have any thoughts they are much
appreciated.
Thanks,
Paul Hamilton
On 10/17/15, 6:39 AM, "Aljoscha Krettek" <
[hidden email]> wrote:
>Hi Paul,
>it’s good to see people interested in this. I sketched a Trigger that
>should fit your requirements:
>
https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac>
>You can use it like this:
>
>DataStream<> input = …
>DataStream<> result = input
> .keyBy(“session-id”)
> .window(GlobalWindows.create())
> .trigger(new SessionTrigger(timeout, maxElements))
> .apply(new MyWindowFunction())
>
>The Trigger uses the new state API that I’m currently introducing in a
>new Pull Request. It should be merged very soon, before the 0.10 release.
>
>This implementation has one caveat, though. It cannot deal with elements
>that belong to different sessions that arrive intermingled with other
>sessions. The reason is that Flink does not yet support merging the
>windows that the WindowAssigner assigns as, for example, the Cloud
>Dataflow API supports. This means that elements cannot be assigned to
>session windows, instead the workaround with the GlobalWindow has to be
>used. I want to tackle this for the release after 0.10, however.
>
>Please let us know if you need more information. I’m always happy to help
>in these interesting cases at the bleeding edge of what is possible. :-)
>
>Cheers,
>Aljoscha
>
>> On 16 Oct 2015, at 19:36, Hamilton, Paul <
[hidden email]>
>>wrote:
>>
>> Hi,
>>
>> I am attempting to make use of the new window APIs in streaming to
>> implement a session based window and am not sure if the currently
>>provided
>> functionality handles my use case. Specifically what I want to do is
>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>> in Google DataFlow.
>>
>> Assuming the events are keyed by session id. I would like to use the
>> event time and the watermarking functionality to trigger a window after
>> the ³end of a session² (no events for a given session received within x
>> amount of time). With watermarking this would mean trigger when a
>> watermark is seen that is > (the time of the last event + session
>> timeout). Also I want to perform an early triggering of the window
>>after a
>> given number of events have been received.
>>
>> Is it currently possible to do this with the current combination of
>>window
>> assigners and triggers? I am happy to write custom triggers etc, but
>> wanted to make sure it wasn¹t already available before going down that
>> road.
>>
>> Thanks,
>>
>> Paul Hamilton
>> Hybris Software
>>
>>
>