Hi all,
I'm exploring Flink for our new project. Currently I'm playing with Session Windows with dynamic Gap. In short, I would like to be able to change the value of the gap on demand, for example on config update. So I'm having this code: messageStream .keyBy(tradeKeySelector) .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<EnrichedMessage>() { @Override public long extract(EnrichedMessage element) { * // Try to dynamically change the gap here // milliseconds. return 5000;* } })) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); I would assume something like "broadcast pattern" here, although this is related to operators and we are interested with SessionWindowTimeGapExtractor here. Probably we will keep the gap size in a Flink State, not sure if it has to be keyed state or "operator state". Updates will come from external system. So I guess, what i need here is actually an operator that will implements SessionWindowTimeGapExtractor interface. Instance of this operator will keep/update the state based on Config updates and returns the gap size like SessionWindowTimeGapExtractor. Would it be a valid approach for this use case? Is it any other way to have such a config in Flink state? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, Firstly, IMO, you can implement this feature by customizing the `SessionWindowTimeGapExtractor`. Additionally, let me clearify a concept. A component that implements the `SessionWindowTimeGapExtractor` interface should not be an operator in Flink. In Flink's concepts, Window is an operator, it contains several components: assigner, trigger, evictors and so on.[1] From Flink's codebase, I did not find a specific implementation of this interface. And it may not access the Flink's state. However, you can still customize this interface and got the new dynamic gap value via accessing the third-party systems (Kafka, Redis, ZooKeeper...). For each element, when assigning a session window for it, Flink always invoke `SessionWindowTimeGapExtractor#extract()` method. So it makes sense. Best, Vino KristoffSC <[hidden email]> 于2020年1月2日周四 下午7:37写道: Hi all, |
Thank you for the answer,
the thing is that I would not like to call external system for each Window, rather I woudl like to keep the gap size in Flink's state which I will be able to change from external system, for example handle configUpdate message from Kafka. So if SessionWindowTimeGapExtractor should not be implemented on operators, then I understand that SessionWindowTimeGapExtractor implementation will fetch config value from Flink's state (value/map) according to the state descriptor. However I would not want to fetch data from the state for each call, because it will be changed not that often. It is similar case for Broadcast state pattern but for SessionWindowTimeGapExtractor case. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
So I was trying to have something like this:
PipelineConfigOperator pipelineConfigOperator = new PipelineConfigOperator(); messageStream .connect(pipelineConfigStream) .process(*pipelineConfigOperator*) .keyBy(tradeKeySelector) .window(ProcessingTimeSessionWindows.withDynamicGap(*pipelineConfigOperator*)) .process(new CumulativeTransactionOperator()) .name("Aggregate Transaction Builder"); where: PipelineConfigOperator extends BroadcastProcessFunction<EnrichedMessage, String, EnrichedMessage> implements SessionWindowTimeGapExtractor<EnrichedMessage> BroadcastStream<String> pipelineConfigStream = configRulesStream .broadcast(pipelineConfigStateDescriptor); MapStateDescriptor<String, String> pipelineConfigStateDescriptor = new MapStateDescriptor<>( "PipelineConfigBroadcastState", Types.STRING, TypeInformation.of(new TypeHint<String>() { })); SingleOutputStreamOperator<String> configRulesStream = env.addSource(new FlinkKafkaConsumer<>("pipeline-config", new SimpleStringSchema(), properties)) .name("Pipeline config stream"); PipelineConfigOperator keeps config in Broadcast state and its copy in local, transient HashMap. Whenever processBroadcastElement is called, Broadcast state and HashMap are updated. The problem is that when "extract" method is called the hashMap is null even thou it was initialized in open method. I was implementing Broadcastr state Pattern to standard operators like it is presented in documentation so Im familair with this concept. I assumed I can reuse it here. The bottom line is that I will not want to fetch state every time, only after config update -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Ok,
I did some more tests and yep, it seems that there is no way to use Flink's State in class that will implement SessionWindowTimeGapExtractor. Even if I will implement this interface on a class that is an operator, whenever extract method is called it does not have any access to Flink's state. Even calling getRuntimeContext() from it throws an exception. Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, >> Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor? As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an operator. But I cannot give a clear answer. Let me ping [hidden email] to give the answer. Best, Vino KristoffSC <[hidden email]> 于2020年1月3日周五 上午6:17写道: Ok, |
Hi Kristoff,
There are no plans of adding state support to the gap extractors but you could do this using a two-step approach, i.e. have an operation in front of the window that keeps track of session gaps, enriches the message with the gap that should be used and then the extractor extracts that gap. This is a more modular approach compared to putting everything in one operator/extractor. Best, Aljoscha > On 3. Jan 2020, at 08:52, vino yang <[hidden email]> wrote: > > Hi KristoffSC, > > >> Are there any plans to add support of Flink State into SessionWindowTimeGapExtractor? > > As I said, `SessionWindowTimeGapExtractor` is neither a general UDF nor an operator. > > But I cannot give a clear answer. Let me ping @Aljoscha Krettek to give the answer. > > Best, > Vino > > KristoffSC <[hidden email]> 于2020年1月3日周五 上午6:17写道: > Ok, > I did some more tests and yep, it seems that there is no way to use Flink's > State in class that will implement SessionWindowTimeGapExtractor. > > Even if I will implement this interface on a class that is an operator, > whenever extract method is called it does not have any access to Flink's > state. Even calling getRuntimeContext() from it throws an exception. > > Are there any plans to add support of Flink State into > SessionWindowTimeGapExtractor? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Aljoscha,
Thanks for the response. This sounds ok for me. It's as if the message carries additional information that can "tell" operators how to handle this message. Maybe we could use this approach also for different use cases. I will try this approach, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |