Queryable State and Windows

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Queryable State and Windows

Joe Olson
From what I've read in the documentation, and from the examples I've seen, in order to make state queryable externally to Flink, the state descriptor variables need access to the Flink runtime context.

This means the stream processor has to have access to the 'Rich' level objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state examples I have seen revolve around RichFlatMap.

Is there a way to get the runtime context exposed so that you can have state descriptor variables queryable from within a Flink window, while the window is loading?

My processor is built around the following:

.addSource(new FlinkKafkaConsumer010())
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
.keyBy()
.window(GlobalWindows.create())
.trigger(new myTrigger())
.apply(new myWindowFunction())
.addSink(new mySink())

The only rich object in this chain are available in the apply (RichWindowFunction). But that is too late - I want to be able to query out whats in the window while it is filling. I know I have access to onElement in the trigger, and I can set up the state descriptor variables there, but the variables don't seem to have exposure to the runtime environment within the trigger.

Is there a way to get queryable state within a Flink window while it is filling?

Reply | Threaded
Open this post in threaded view
|

Re: Queryable State and Windows

Ufuk Celebi
This is not possible at the moment. We discussed this a couple of
times before, but in the end did not want to expose it with the
initial version, because the interfaces are still very raw. This is
definitely on the agenda though.

As a work around you would have to build a custom Flink version with
calls `setQueryable` on the state descriptors of the WindowOperator.
If there is an easy non intrusive way to activate this for the
upcoming 1.2 version, I will try to do it.



On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson <[hidden email]> wrote:

> From what I've read in the documentation, and from the examples I've seen,
> in order to make state queryable externally to Flink, the state descriptor
> variables need access to the Flink runtime context.
>
> This means the stream processor has to have access to the 'Rich' level
> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
> examples I have seen revolve around RichFlatMap.
>
> Is there a way to get the runtime context exposed so that you can have state
> descriptor variables queryable from within a Flink window, while the window
> is loading?
>
> My processor is built around the following:
>
> .addSource(new FlinkKafkaConsumer010())
> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> .keyBy()
> .window(GlobalWindows.create())
> .trigger(new myTrigger())
> .apply(new myWindowFunction())
> .addSink(new mySink())
>
> The only rich object in this chain are available in the apply
> (RichWindowFunction). But that is too late - I want to be able to query out
> whats in the window while it is filling. I know I have access to onElement
> in the trigger, and I can set up the state descriptor variables there, but
> the variables don't seem to have exposure to the runtime environment within
> the trigger.
>
> Is there a way to get queryable state within a Flink window while it is
> filling?
>
Reply | Threaded
Open this post in threaded view
|

Re: Queryable State and Windows

snntr
I just want to add another workaround, which does not need a
self-compiled version. You can use TimeWindow with a CountTriger.of(1)
combined with a FoldFunction for pre-aggregration and a
RichWindowFunction to update the queryable state. Additionally, you need
a TimeWindow for the final results. So you are doubling the amount of
state as well as computation, but depending on the circumstances this
might be preferrable than tweaking Flink 1.2.

I think, Jamie Grier did this similarly in one of his presentation on
the topic.

Cheers,

Konstantin

On 23.01.2017 15:39, Ufuk Celebi wrote:

> This is not possible at the moment. We discussed this a couple of
> times before, but in the end did not want to expose it with the
> initial version, because the interfaces are still very raw. This is
> definitely on the agenda though.
>
> As a work around you would have to build a custom Flink version with
> calls `setQueryable` on the state descriptors of the WindowOperator.
> If there is an easy non intrusive way to activate this for the
> upcoming 1.2 version, I will try to do it.
>
>
>
> On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson <[hidden email]> wrote:
>> From what I've read in the documentation, and from the examples I've seen,
>> in order to make state queryable externally to Flink, the state descriptor
>> variables need access to the Flink runtime context.
>>
>> This means the stream processor has to have access to the 'Rich' level
>> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
>> examples I have seen revolve around RichFlatMap.
>>
>> Is there a way to get the runtime context exposed so that you can have state
>> descriptor variables queryable from within a Flink window, while the window
>> is loading?
>>
>> My processor is built around the following:
>>
>> .addSource(new FlinkKafkaConsumer010())
>> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>> .keyBy()
>> .window(GlobalWindows.create())
>> .trigger(new myTrigger())
>> .apply(new myWindowFunction())
>> .addSink(new mySink())
>>
>> The only rich object in this chain are available in the apply
>> (RichWindowFunction). But that is too late - I want to be able to query out
>> whats in the window while it is filling. I know I have access to onElement
>> in the trigger, and I can set up the state descriptor variables there, but
>> the variables don't seem to have exposure to the runtime environment within
>> the trigger.
>>
>> Is there a way to get queryable state within a Flink window while it is
>> filling?
>>
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (849 bytes) Download Attachment