Session Window with dynamic gap

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Session Window with dynamic gap

KristoffSC
Hi all,
I'm exploring Flink for our new project.

Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update.

So I'm having this code:


messageStream
                .keyBy(tradeKeySelector)
                .window(ProcessingTimeSessionWindows.withDynamicGap(new
                  SessionWindowTimeGapExtractor<EnrichedMessage>() {
                    @Override
                    public long extract(EnrichedMessage element) {
                       * // Try to dynamically change the gap here
                        // milliseconds.
                        return 5000;*
                    }
                }))
                .process(new CumulativeTransactionOperator())
                .name("Aggregate Transaction Builder");

I would assume something like "broadcast pattern" here, although this is
related to operators and we are interested with
SessionWindowTimeGapExtractor here.

Probably we will keep the gap size in a Flink State, not sure if it has to
be keyed state or "operator state". Updates will come from external system.

So I guess, what i need here is actually an operator that will implements
SessionWindowTimeGapExtractor interface. Instance of this operator will
keep/update the state based on Config updates and returns the gap size like
SessionWindowTimeGapExtractor.

Would it be a valid approach for this use case? Is it any other way to have
such a config in Flink state?







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

vino yang
Hi KristoffSC,

Firstly, IMO, you can implement this feature by customizing the `SessionWindowTimeGapExtractor`.

Additionally, let me clearify a concept. A component that implements the `SessionWindowTimeGapExtractor` interface should not be an operator in Flink. 

In Flink's concepts, Window is an operator, it contains several components: assigner, trigger, evictors and so on.[1]

From Flink's codebase, I did not find a specific implementation of this interface. And it may not access the Flink's state. However, you can still customize this interface and got the new dynamic gap value via accessing the third-party systems (Kafka, Redis, ZooKeeper...). For each element, when assigning a session window for it, Flink always invoke `SessionWindowTimeGapExtractor#extract()` method. So it makes sense.

Best,
Vino


KristoffSC <[hidden email]> 于2020年1月2日周四 下午7:37写道:
Hi all,
I'm exploring Flink for our new project.

Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update.

So I'm having this code:


messageStream
                .keyBy(tradeKeySelector)
                .window(ProcessingTimeSessionWindows.withDynamicGap(new
                  SessionWindowTimeGapExtractor<EnrichedMessage>() {
                    @Override
                    public long extract(EnrichedMessage element) {
                       * // Try to dynamically change the gap here
                        // milliseconds.
                        return 5000;*
                    }
                }))
                .process(new CumulativeTransactionOperator())
                .name("Aggregate Transaction Builder");

I would assume something like "broadcast pattern" here, although this is
related to operators and we are interested with
SessionWindowTimeGapExtractor here.

Probably we will keep the gap size in a Flink State, not sure if it has to
be keyed state or "operator state". Updates will come from external system.

So I guess, what i need here is actually an operator that will implements
SessionWindowTimeGapExtractor interface. Instance of this operator will
keep/update the state based on Config updates and returns the gap size like
SessionWindowTimeGapExtractor.

Would it be a valid approach for this use case? Is it any other way to have
such a config in Flink state?







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

KristoffSC
Thank you for the answer,

the thing is that I would not like to call external system for each Window,
rather I woudl like to keep the gap size in Flink's state which I will be
able to change from external system, for example handle configUpdate message
from Kafka.

So if SessionWindowTimeGapExtractor should not be implemented on operators,
then I understand that
SessionWindowTimeGapExtractor implementation will fetch config value from
Flink's state (value/map) according to the state descriptor.

However I would not want to fetch data from the state for each call, because
it will be changed not that often. It is similar case for Broadcast state
pattern but for SessionWindowTimeGapExtractor case.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

KristoffSC
So I was trying to have something like this:

PipelineConfigOperator pipelineConfigOperator = new
PipelineConfigOperator();

messageStream
                .connect(pipelineConfigStream)
                .process(*pipelineConfigOperator*)
                .keyBy(tradeKeySelector)
               
.window(ProcessingTimeSessionWindows.withDynamicGap(*pipelineConfigOperator*))
                .process(new CumulativeTransactionOperator())
                .name("Aggregate Transaction Builder");

where:

PipelineConfigOperator extends BroadcastProcessFunction<EnrichedMessage,
String, EnrichedMessage> implements
SessionWindowTimeGapExtractor<EnrichedMessage>


BroadcastStream<String> pipelineConfigStream = configRulesStream
                .broadcast(pipelineConfigStateDescriptor);

MapStateDescriptor<String, String> pipelineConfigStateDescriptor = new
MapStateDescriptor<>(
                "PipelineConfigBroadcastState",
                Types.STRING,
                TypeInformation.of(new TypeHint<String>() {
                }));

SingleOutputStreamOperator<String> configRulesStream = env.addSource(new
FlinkKafkaConsumer<>("pipeline-config",
                new SimpleStringSchema(), properties))
                .name("Pipeline config stream");


PipelineConfigOperator keeps config in Broadcast state and its copy in
local, transient HashMap.
Whenever processBroadcastElement is called, Broadcast state and HashMap are
updated.

The problem is that when "extract" method is called the hashMap is null even
thou it was initialized in open method.

I was implementing Broadcastr state Pattern to standard operators like it is
presented in documentation so Im familair with this concept. I assumed I can
reuse it here. The bottom line is that I will not want to fetch state every
time, only after config update




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

KristoffSC
Ok,
I did some more tests and yep, it seems that there is no way to use Flink's
State in class that will implement SessionWindowTimeGapExtractor.

Even if I will implement this interface on a class that is an operator,
whenever extract method is called it does not have any access to Flink's
state. Even calling getRuntimeContext() from it throws an exception.

Are there any plans to add support of Flink State into
SessionWindowTimeGapExtractor?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

vino yang
Hi KristoffSC,

>> Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor?

As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an operator.

But I cannot give a clear answer. Let me ping [hidden email]  to give the answer.

Best,
Vino

KristoffSC <[hidden email]> 于2020年1月3日周五 上午6:17写道:
Ok,
I did some more tests and yep, it seems that there is no way to use Flink's
State in class that will implement SessionWindowTimeGapExtractor.

Even if I will implement this interface on a class that is an operator,
whenever extract method is called it does not have any access to Flink's
state. Even calling getRuntimeContext() from it throws an exception.

Are there any plans to add support of Flink State into
SessionWindowTimeGapExtractor?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

Aljoscha Krettek
Hi Kristoff,

There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts that gap. This is a more modular approach compared to putting everything in one operator/extractor.

Best,
Aljoscha

> On 3. Jan 2020, at 08:52, vino yang <[hidden email]> wrote:
>
> Hi KristoffSC,
>
> >> Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor?
>
> As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an operator.
>
> But I cannot give a clear answer. Let me ping @Aljoscha Krettek  to give the answer.
>
> Best,
> Vino
>
> KristoffSC <[hidden email]> 于2020年1月3日周五 上午6:17写道:
> Ok,
> I did some more tests and yep, it seems that there is no way to use Flink's
> State in class that will implement SessionWindowTimeGapExtractor.
>
> Even if I will implement this interface on a class that is an operator,
> whenever extract method is called it does not have any access to Flink's
> state. Even calling getRuntimeContext() from it throws an exception.
>
> Are there any plans to add support of Flink State into
> SessionWindowTimeGapExtractor?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Session Window with dynamic gap

KristoffSC
Hi Aljoscha,
Thanks for the response.

This sounds ok for me. It's as if the message carries additional information
that can "tell" operators how to handle this message. Maybe we could use
this approach also for different use cases.

I will try this approach, thanks.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/