http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Session-Based-Windows-tp3137p3240.html
the key based state should now be fixed in the current 0.10-SNAPSHOT builds if you want to continue playing around with it.
> On 21 Oct 2015, at 19:40, Aljoscha Krettek <
[hidden email]> wrote:
>
> Hi Paul,
> good to hear that the windowing works for you.
>
> With the key based state I’m afraid you found a bug. The problem is that the state backend is not properly set to the right key when the window is evaluated. I will look into fixing this ASAP before the 0.10 release.
>
> Cheers,
> Aljoscha
>> On 21 Oct 2015, at 19:32, Hamilton, Paul <
[hidden email]> wrote:
>>
>> Hi Aljoscha,
>>
>> Thanks a lot for your Trigger implementation, definitely helped provide
>> some direction. It appears to be working well for our use case. One
>> thing I have noticed now that I have pulled the state API changes in is
>> that key based state within a window function does not appear to be
>> working. Perhaps I am not using it correctly now that the API has
>> changed. Previously we had done something like this within the
>> RichWindowFunction:
>>
>> @Override
>> public void open(final Configuration parameters) throws Exception {
>> state = getRuntimeContext().getOperatorState("state", new StatePojo(),
>> true);
>> }
>>
>> Based on the API changes I switched it to:
>>
>> @Override
>> public void open(final Configuration parameters) throws Exception {
>> state = getRuntimeContext().getKeyValueState("state", StatePojo.class,
>> new StatePojo());
>> }
>>
>>
>> But the state doesn’t seem to be partitioned based on the key. I haven’t
>> had much time to play around with it, so its certainly possible that I
>> messed something up while refactoring to the API change. I will look at
>> it further when I get a chance, but if you have any thoughts they are much
>> appreciated.
>>
>>
>> Thanks,
>> Paul Hamilton
>>
>>
>> On 10/17/15, 6:39 AM, "Aljoscha Krettek" <
[hidden email]> wrote:
>>
>>> Hi Paul,
>>> it’s good to see people interested in this. I sketched a Trigger that
>>> should fit your requirements:
>>>
https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac>>>
>>> You can use it like this:
>>>
>>> DataStream<> input = …
>>> DataStream<> result = input
>>> .keyBy(“session-id”)
>>> .window(GlobalWindows.create())
>>> .trigger(new SessionTrigger(timeout, maxElements))
>>> .apply(new MyWindowFunction())
>>>
>>> The Trigger uses the new state API that I’m currently introducing in a
>>> new Pull Request. It should be merged very soon, before the 0.10 release.
>>>
>>> This implementation has one caveat, though. It cannot deal with elements
>>> that belong to different sessions that arrive intermingled with other
>>> sessions. The reason is that Flink does not yet support merging the
>>> windows that the WindowAssigner assigns as, for example, the Cloud
>>> Dataflow API supports. This means that elements cannot be assigned to
>>> session windows, instead the workaround with the GlobalWindow has to be
>>> used. I want to tackle this for the release after 0.10, however.
>>>
>>> Please let us know if you need more information. I’m always happy to help
>>> in these interesting cases at the bleeding edge of what is possible. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>> On 16 Oct 2015, at 19:36, Hamilton, Paul <
[hidden email]>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am attempting to make use of the new window APIs in streaming to
>>>> implement a session based window and am not sure if the currently
>>>> provided
>>>> functionality handles my use case. Specifically what I want to do is
>>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>>> in Google DataFlow.
>>>>
>>>> Assuming the events are keyed by session id. I would like to use the
>>>> event time and the watermarking functionality to trigger a window after
>>>> the ³end of a session² (no events for a given session received within x
>>>> amount of time). With watermarking this would mean trigger when a
>>>> watermark is seen that is > (the time of the last event + session
>>>> timeout). Also I want to perform an early triggering of the window
>>>> after a
>>>> given number of events have been received.
>>>>
>>>> Is it currently possible to do this with the current combination of
>>>> window
>>>> assigners and triggers? I am happy to write custom triggers etc, but
>>>> wanted to make sure it wasn¹t already available before going down that
>>>> road.
>>>>
>>>> Thanks,
>>>>
>>>> Paul Hamilton
>>>> Hybris Software
>>>>
>>>>
>>>
>>
>