Flink QueryableState with Sliding Window on RocksDB

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Hi,

We recently moved from Spark Streaming to Flink for our stream processing requirements in our organization and we are in the process of removing the number of external calls as much as possible. Earlier we were using HBASE to store the incoming data, but we now want to try out stateful operations on top of Flink.

In that aspect, we have fixed that we need to have a sliding window of size 180 days with a slide Interval of 1 day each such that we keep a state of 180 days at any given time. This state would at max be around 40-50 GB for the 180 days so we thought of using RocksDB for state storage.

Now the flow of job we are thinking would be incoming events and some extra information:

events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new UpdatedTxnState());

where UpdatedTxnState() is an extension of RichFlatMapFunction class and it looks something like this :


public class UpdatedTxnState extends RichFlatMapFunction<Tuple3<String, List<String>, EventType>, Tuple2<String, EventType>> {

  private ValueState<Tuple3<EventType, List<String>, String>> txnState;

  @Override
  public void open(Configuration config) throws Exception {
    // Reducing state that keeps a sum
    ValueStateDescriptor<Tuple3<EventType, List<String>, String>> stateDescriptor = new ValueStateDescriptor<>(
            "transaction", TypeInformation.of(new TypeHint<Tuple3<EventType, List<String>, String>>() {
    }));

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void flatMap(Tuple3<String, List<String>, EventType> input,
                      Collector<Tuple2<String, EventType>> output) throws Exception {


    txnState.update(new Tuple3<>(input._3(),input._2(),input._1());

    output.collect(new Tuple2<>(input._1(),input._3()));
  }
}


now, I have a couple of questions :
1. how can I create a sliding window on top of this state? I can think of doing a keyby on the output of flatmap but for me doesn't really make much sense and I didn't really find a way to build a state after windowing.
2. Can I query the state with the state name I defined here "transaction" anywhere in my job?

Thanks,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Fabian Hueske-2
Hi Biplob,

What do you mean by "creating a sliding window on top of a state"?
Sliding windows are typically defined on streams (data in motion) and not on state (data at rest).

It seems that UpdatedTxnState always holds the last record that was received per key. Do you want to compute the windows always on the last records you received per key?
This would mean that you need to retract the overwritten values, i.e., remove them from the result, and add the new values to the result.
This can be implemented but requires a good design. You cannot use the built-in sliding windows of the DataStream because they do not support retraction.
I think you have to implement this functionality yourself.

Best, Fabian





2017-07-28 15:34 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi,

We recently moved from Spark Streaming to Flink for our stream processing
requirements in our organization and we are in the process of removing the
number of external calls as much as possible. Earlier we were using HBASE to
store the incoming data, but we now want to try out stateful operations on
top of Flink.

In that aspect, we have fixed that we need to have a sliding window of size
180 days with a slide Interval of 1 day each such that we keep a state of
180 days at any given time. This state would at max be around 40-50 GB for
the 180 days so we thought of using RocksDB for state storage.

Now the flow of job we are thinking would be incoming events and some extra
information:

events.keyBy(eventTuple -> eventTuple.getEventUID()).flatMap(new
UpdatedTxnState());

where UpdatedTxnState() is an extension of RichFlatMapFunction class and it
looks something like this :


public class UpdatedTxnState extends RichFlatMapFunction<Tuple3&lt;String,
List&lt;String>, EventType>, Tuple2<String, EventType>> {

  private ValueState<Tuple3&lt;EventType, List&lt;String>, String>>
txnState;

  @Override
  public void open(Configuration config) throws Exception {
    // Reducing state that keeps a sum
    ValueStateDescriptor<Tuple3&lt;EventType, List&lt;String>, String>>
stateDescriptor = new ValueStateDescriptor<>(
            "transaction", TypeInformation.of(new
TypeHint<Tuple3&lt;EventType, List&lt;String>, String>>() {
    }));

    stateDescriptor.setQueryable("transaction");

    this.txnState = getRuntimeContext().getState(stateDescriptor);
  }

  @Override
  public void flatMap(Tuple3<String, List&lt;String>, EventType> input,
                      Collector<Tuple2&lt;String, EventType>> output) throws
Exception {


    txnState.update(new Tuple3<>(input._3(),input._2(),input._1());

    output.collect(new Tuple2<>(input._1(),input._3()));
  }
}


now, I have a couple of questions :
1. how can I create a sliding window on top of this state? I can think of
doing a keyby on the output of flatmap but for me doesn't really make much
sense and I didn't really find a way to build a state after windowing.
2. Can I query the state with the state name I defined here "transaction"
anywhere in my job?

Thanks,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Thanks Fabian for the reply, I was reconsidering my design and the requirement and what I mentioned already is partially confusing.

I realized that using a sessionwindow is better in this scenario where I want a value to be updated per key and the session resets to wait for the gap period with every update after which it should be removed.

From whatever I read currently about session windows (and windows in general), is that I can perform aggregates over it. What I was thinking was that rather than performing an aggregate or reduce, I can simply replace the old value.

now my problem is, would such a state be queryable?

As you mentioned that siding windows are defined on streams, is it the same for session windows?

"I basically want a state which updates by key, and where records can self-destruct after a fixed amount of time(basically records being part of a session) if not updated (such that the state doesn't grow indefinitely)?"  

Are there any abstractions to do the same and if not, is it possible to discuss if this can be done otherwise?

Thanks a lot,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Fabian Hueske-2
Hi Biplob,

given these requirements, I would rather not use a window but implement the functionality with a stateful ProcessFunction.
A ProcessFunction can register timers, e.g., to remove inactive state. The state of a ProcessFunction can be made queryable.

Best, Fabian

2017-07-31 9:52 GMT+02:00 Biplob Biswas <[hidden email]>:
Thanks Fabian for the reply, I was reconsidering my design and the
requirement and what I mentioned already is partially confusing.

I realized that using a sessionwindow is better in this scenario where I
want a value to be updated per key and the session resets to wait for the
gap period with every update after which it should be removed.

From whatever I read currently about session windows (and windows in
general), is that I can perform aggregates over it. What I was thinking was
that rather than performing an aggregate or reduce, I can simply replace the
old value.

now my problem is, would such a state be queryable?

As you mentioned that siding windows are defined on streams, is it the same
for session windows?

"I basically want a state which updates by key, and where records can
self-destruct after a fixed amount of time(basically records being part of a
session) if not updated (such that the state doesn't grow indefinitely)?"

Are there any abstractions to do the same and if not, is it possible to
discuss if this can be done otherwise?

Thanks a lot,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14549.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Hi Fabian,

Thanks a lot for pointing that out would read about it and give it a try.

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Hi Fabian,

I read about the process function and it seems a perfect fit for my requirement. Although while reading more about queryable-state I found that its not meant to do lookups within job (Your comment in the following link).

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-practices-to-maintain-reference-data-for-Flink-Jobs-td13215.html#a13233

The thing is I wanted to maintain a local state store which is queryable within the same job such that one operator creates and updates it and then another operator queries the store based on a given key.

I was hoping this to be possible but it seems not to be the case, can you shed some light and if possible recommend some alternatives?

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Fabian Hueske-2
I am not sure that this is impossible, but it is not the use case queryable state was designed for.
I don't know the details of your application, but you could try to merge the updating and the querying operators into a single one.

You could connect two streams with connect() and use a keyed CoProcessFunction. This will have the advantages that all state access are local.

Cheers, Fabian

2017-07-31 13:24 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi Fabian,

I read about the process function and it seems a perfect fit for my
requirement. Although while reading more about queryable-state I found that
its not meant to do lookups within job (Your comment in the following link).

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-practices-to-maintain-reference-data-for-Flink-Jobs-td13215.html#a13233

The thing is I wanted to maintain a local state store which is queryable
within the same job such that one operator creates and updates it and then
another operator queries the store based on a given key.

I was hoping this to be possible but it seems not to be the case, can you
shed some light and if possible recommend some alternatives?

Regards,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14555.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Hi Fabian,

Thanks for the insight, I am currently exploring  QueryableStateClient and would attempt to get the value for a corresponding key using the getkvstate() function,
I was confused about the jobId but I am expecting this would provide me with the jobid of the current job -
ExecutionEnvironment.getExecutionEnvironment().getId()

I thought about updating and doing the look up from a single operator but then I believe my code would be a mess with a lot of logic and no logical separation, so that's the last thing I want to try.

My team is also exploring KStreams with Ktables and they claim that does what we want to do, will update with any further information. If possible contribute to Flink to use the keyedstate natively in the same job as well.

Thanks for the help,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Fabian Hueske-2
Having an operator that updates state from one stream and queries it to process the other stream is actually a common pattern.
As I said, I don't know your use case but I don't think that a CoProcessFunction would result in a mess.

QueryableState will have quite a bit of overhead because the request and response will always go over the network.

2017-07-31 15:23 GMT+02:00 Biplob Biswas <[hidden email]>:
Hi Fabian,

Thanks for the insight, I am currently exploring  QueryableStateClient and
would attempt to get the value for a corresponding key using the
getkvstate() function,
I was confused about the jobId but I am expecting this would provide me with
the jobid of the current job -
ExecutionEnvironment.getExecutionEnvironment().getId()

I thought about updating and doing the look up from a single operator but
then I believe my code would be a mess with a lot of logic and no logical
separation, so that's the last thing I want to try.

My team is also exploring KStreams with Ktables and they claim that does
what we want to do, will update with any further information. If possible
contribute to Flink to use the keyedstate natively in the same job as well.

Thanks for the help,
Biplob



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-QueryableState-with-Sliding-Window-on-RocksDB-tp14514p14558.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Flink QueryableState with Sliding Window on RocksDB

Biplob Biswas
Hi Fabian,

I am not really sure using CoProcessFunction would be useful for my use case. My use case, in short, can be explained as follows:

1) create 2 different local state store, where both have 1-N relationship. For eg.  1 -> [A,B,C] and A -> [1,2,3]
2) Based on the key A, get list of elements [1,2,3] and then iterate over this list and based on the keys 1,2,3 query the second store to get the list of elements.
3) Do this till a depth of 1
4) Now based on Key A gain perform merge operations and emit the merged output.

So, I can't imagine having 2 keyed state together when I need to query them randomly and not just on the key of one store. That's why we need 2 queryable state which can be queried in the next operator together.

That's why I am not very optimistic about the CoProcessFunction for my case. Maybe I am wrong and I have missed something, so any insights would be useful!

Regards
Biplob