Hi,
We recently moved from Spark Streaming to Flink for our stream processing requirements in our organization and we are in the process of removing the number of external calls as much as possible. Earlier we were using HBASE to store the incoming data, but we now want to try out stateful operations on top of Flink. In that aspect, we have fixed that we need to have a sliding window of size 180 days with a slide Interval of 1 day each such that we keep a state of 180 days at any given time. This state would at max be around 40-50 GB for the 180 days so we thought of using RocksDB for state storage. Now the flow of job we are thinking would be incoming events and some extra information: events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new UpdatedTxnState()); where UpdatedTxnState() is an extension of RichFlatMapFunction class and it looks something like this : public class UpdatedTxnState extends RichFlatMapFunction<Tuple3<String, List<String>, EventType>, Tuple2<String, EventType>> { private ValueState<Tuple3<EventType, List<String>, String>> txnState; @Override public void open(Configuration config) throws Exception { // Reducing state that keeps a sum ValueStateDescriptor<Tuple3<EventType, List<String>, String>> stateDescriptor = new ValueStateDescriptor<>( "transaction", TypeInformation.of(new TypeHint<Tuple3<EventType, List<String>, String>>() { })); stateDescriptor.setQueryable("transaction"); this.txnState = getRuntimeContext().getState(stateDescriptor); } @Override public void flatMap(Tuple3<String, List<String>, EventType> input, Collector<Tuple2<String, EventType>> output) throws Exception { txnState.update(new Tuple3<>(input._3(),input._2(),input._1()); output.collect(new Tuple2<>(input._1(),input._3())); } } now, I have a couple of questions : 1. how can I create a sliding window on top of this state? I can think of doing a keyby on the output of flatmap but for me doesn't really make much sense and I didn't really find a way to build a state after windowing. 2. Can I query the state with the state name I defined here "transaction" anywhere in my job? Thanks, Biplob |
Hi Biplob, It seems that UpdatedTxnState always holds the last record that was received per key. Do you want to compute the windows always on the last records you received per key? This would mean that you need to retract the overwritten values, i.e., remove them from the result, and add the new values to the result. This can be implemented but requires a good design. You cannot use the built-in sliding windows of the DataStream because they do not support retraction. I think you have to implement this functionality yourself. Best, Fabian 2017-07-28 15:34 GMT+02:00 Biplob Biswas <[hidden email]>: Hi, |
Thanks Fabian for the reply, I was reconsidering my design and the requirement and what I mentioned already is partially confusing.
I realized that using a sessionwindow is better in this scenario where I want a value to be updated per key and the session resets to wait for the gap period with every update after which it should be removed. From whatever I read currently about session windows (and windows in general), is that I can perform aggregates over it. What I was thinking was that rather than performing an aggregate or reduce, I can simply replace the old value. now my problem is, would such a state be queryable? As you mentioned that siding windows are defined on streams, is it the same for session windows? "I basically want a state which updates by key, and where records can self-destruct after a fixed amount of time(basically records being part of a session) if not updated (such that the state doesn't grow indefinitely)?" Are there any abstractions to do the same and if not, is it possible to discuss if this can be done otherwise? Thanks a lot, Biplob |
Hi Biplob, given these requirements, I would rather not use a window but implement the functionality with a stateful ProcessFunction. 2017-07-31 9:52 GMT+02:00 Biplob Biswas <[hidden email]>: Thanks Fabian for the reply, I was reconsidering my design and the |
Hi Fabian,
Thanks a lot for pointing that out would read about it and give it a try. Regards, Biplob |
Hi Fabian,
I read about the process function and it seems a perfect fit for my requirement. Although while reading more about queryable-state I found that its not meant to do lookups within job (Your comment in the following link). http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-practices-to-maintain-reference-data-for-Flink-Jobs-td13215.html#a13233 The thing is I wanted to maintain a local state store which is queryable within the same job such that one operator creates and updates it and then another operator queries the store based on a given key. I was hoping this to be possible but it seems not to be the case, can you shed some light and if possible recommend some alternatives? Regards, Biplob |
I am not sure that this is impossible, but it is not the use case queryable state was designed for. I don't know the details of your application, but you could try to merge the updating and the querying operators into a single one.2017-07-31 13:24 GMT+02:00 Biplob Biswas <[hidden email]>: Hi Fabian, |
Hi Fabian,
Thanks for the insight, I am currently exploring QueryableStateClient and would attempt to get the value for a corresponding key using the getkvstate() function, I was confused about the jobId but I am expecting this would provide me with the jobid of the current job - ExecutionEnvironment.getExecutionEnvironment().getId() I thought about updating and doing the look up from a single operator but then I believe my code would be a mess with a lot of logic and no logical separation, so that's the last thing I want to try. My team is also exploring KStreams with Ktables and they claim that does what we want to do, will update with any further information. If possible contribute to Flink to use the keyedstate natively in the same job as well. Thanks for the help, Biplob |
Having an operator that updates state from one stream and queries it to process the other stream is actually a common pattern. As I said, I don't know your use case but I don't think that a CoProcessFunction would result in a mess.2017-07-31 15:23 GMT+02:00 Biplob Biswas <[hidden email]>: Hi Fabian, |
Hi Fabian,
I am not really sure using CoProcessFunction would be useful for my use case. My use case, in short, can be explained as follows: 1) create 2 different local state store, where both have 1-N relationship. For eg. 1 -> [A,B,C] and A -> [1,2,3] 2) Based on the key A, get list of elements [1,2,3] and then iterate over this list and based on the keys 1,2,3 query the second store to get the list of elements. 3) Do this till a depth of 1 4) Now based on Key A gain perform merge operations and emit the merged output. So, I can't imagine having 2 keyed state together when I need to query them randomly and not just on the key of one store. That's why we need 2 queryable state which can be queried in the next operator together. That's why I am not very optimistic about the CoProcessFunction for my case. Maybe I am wrong and I have missed something, so any insights would be useful! Regards Biplob |
Free forum by Nabble | Edit this page |