Where does Flink store intermediate state and triggering/scheduling a delayed event?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Where does Flink store intermediate state and triggering/scheduling a delayed event?

Soumya Simanta
I'm getting started with Flink and had a very fundamental doubt. 

1) Where does Flink capture/store intermediate state? 

For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from the first (faster) stream so that it can match and join the data with the second(slower) stream. 

2) Is there a mechanism to trigger/schedule a delayed event in Flink? 

Thanks
-Soumya



Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Fabian Hueske-2
Hi,

1) At the moment, state is kept on the JVM heap in a regular HashMap.

However, we added an interface for pluggable state backends. State backends store the operator state (Flink's built-in window operators are based on operator state as well). A pull request to add a RocksDB backend (going to disk) will be merged soon [1]. Another backend using Flink's managed memory is planned.

2) I am not sure what you mean by trigger / schedule a delayed event, but have a few pointers that might be helpful:
- Flink can handle late arriving events. Check the event-time feature [2].
- Flink's window triggers can be used to schedule window computations [3]
- You can implement a custom source function that emits / triggers events.

Best, Fabian

2016-02-03 5:39 GMT+01:00 Soumya Simanta <[hidden email]>:
I'm getting started with Flink and had a very fundamental doubt. 

1) Where does Flink capture/store intermediate state? 

For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from the first (faster) stream so that it can match and join the data with the second(slower) stream. 

2) Is there a mechanism to trigger/schedule a delayed event in Flink? 

Thanks
-Soumya




Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Anwar Rizal
Allow me to jump to this very interesting discussion. 

The 2nd point is actually an interesting question. 

I understand that we can set a timestamp of event in Flink. What if we set the timestamp to somewhere in the future, for example 24 hours from now ?
Can Flink handle this case ?


Also , I'm still unclear whether the windowing can also be backed up by a backend like RocksDB. Concretely, can we have a time window of 24 hours while the tps is 100 TPS ?

Anwar.

On Wed, Feb 3, 2016 at 10:12 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

1) At the moment, state is kept on the JVM heap in a regular HashMap.

However, we added an interface for pluggable state backends. State backends store the operator state (Flink's built-in window operators are based on operator state as well). A pull request to add a RocksDB backend (going to disk) will be merged soon [1]. Another backend using Flink's managed memory is planned.

2) I am not sure what you mean by trigger / schedule a delayed event, but have a few pointers that might be helpful:
- Flink can handle late arriving events. Check the event-time feature [2].
- Flink's window triggers can be used to schedule window computations [3]
- You can implement a custom source function that emits / triggers events.

Best, Fabian

2016-02-03 5:39 GMT+01:00 Soumya Simanta <[hidden email]>:
I'm getting started with Flink and had a very fundamental doubt. 

1) Where does Flink capture/store intermediate state? 

For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from the first (faster) stream so that it can match and join the data with the second(slower) stream. 

2) Is there a mechanism to trigger/schedule a delayed event in Flink? 

Thanks
-Soumya





Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Aljoscha Krettek
Hi,
with TPS you mean tuples-per-second? I have an open pull request that changes the WindowOperator to work on a partitioned state abstraction. In the pull request I also add a state backend that uses RocksDB, so it it possible.

The size of the windows you can keep also depends on the window function, if it is a ReduceFunction then the window result can be incrementally computed and the state that we have to keep is very small. For a WindowFunction that takes an Iterable of all the window elements the state can grow very large, of course.

Cheers,
Aljoscha
> On 03 Feb 2016, at 11:17, Anwar Rizal <[hidden email]> wrote:
>
> point

Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Stephan Ewen
You can have timestamps that are very much out-of-order (in the future, specifically). The window operator assigns them to the specific window. The window operators can hold many windows concurrently, which are in progress at the same time.

Windows are then flushed once the triggers fire (after a time, or at a watermark).



On Wed, Feb 3, 2016 at 12:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
with TPS you mean tuples-per-second? I have an open pull request that changes the WindowOperator to work on a partitioned state abstraction. In the pull request I also add a state backend that uses RocksDB, so it it possible.

The size of the windows you can keep also depends on the window function, if it is a ReduceFunction then the window result can be incrementally computed and the state that we have to keep is very small. For a WindowFunction that takes an Iterable of all the window elements the state can grow very large, of course.

Cheers,
Aljoscha
> On 03 Feb 2016, at 11:17, Anwar Rizal <[hidden email]> wrote:
>
> point


Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Soumya Simanta
In reply to this post by Fabian Hueske-2
Fabian, 

Thank a lot for your response. Really appreciated. I've some additional questions (please see inline) 

On Wed, Feb 3, 2016 at 2:42 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

1) At the moment, state is kept on the JVM heap in a regular HashMap.
Is this state replicated across JVMs in a cluster setup? This also implies that on a single node Flink setup the is a chance of failure.  

However, we added an interface for pluggable state backends. State backends store the operator state (Flink's built-in window operators are based on operator state as well). A pull request to add a RocksDB backend (going to disk) will be merged soon [1]. Another backend using Flink's managed memory is planned.
Will having a state persistence have any impact on performance?  
 

2) I am not sure what you mean by trigger / schedule a delayed event, but have a few pointers that might be helpful:
- Flink can handle late arriving events. Check the event-time feature [2].
- Flink's window triggers can be used to schedule window computations [3]
- You can implement a custom source function that emits / triggers events.
My specific use case is firing an event (after a certain amount of time) based on some data computation that I'm performing on another stream. 
 

2016-02-03 5:39 GMT+01:00 Soumya Simanta <[hidden email]>:
I'm getting started with Flink and had a very fundamental doubt. 

1) Where does Flink capture/store intermediate state? 

For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from the first (faster) stream so that it can match and join the data with the second(slower) stream. 

2) Is there a mechanism to trigger/schedule a delayed event in Flink? 

Thanks
-Soumya





Reply | Threaded
Open this post in threaded view
|

Re: Where does Flink store intermediate state and triggering/scheduling a delayed event?

Fabian Hueske-2
Hi Soumya,

Operator state is partitioned across JVMs and not replicated. However, it is checkpointed (e.g., to HDFS) at regular intervals to guarantee fault-tolerance with exactly-once semantics. In case of a failure, all operator states are recovered from checkpoints.

The state backend is responsible for the local state management, i.e., how the state that is queried and updated by an operator. Using a state backend such as RocksDB which goes to disk for every request has of course higher latency than doing lookups/updates in a Java HashMap. What you gain is that your local state can grow much larger than the JVMs memory. Since the backend is pluggable, it would be possible to implement a backend with disk persistence and an in-mem cache.

Regarding the use case of firing an event, you can implement it with a custom stream operator in Flink.

Best, Fabian

2016-02-04 10:15 GMT+01:00 Soumya Simanta <[hidden email]>:
Fabian, 

Thank a lot for your response. Really appreciated. I've some additional questions (please see inline) 

On Wed, Feb 3, 2016 at 2:42 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

1) At the moment, state is kept on the JVM heap in a regular HashMap.
Is this state replicated across JVMs in a cluster setup? This also implies that on a single node Flink setup the is a chance of failure.  

However, we added an interface for pluggable state backends. State backends store the operator state (Flink's built-in window operators are based on operator state as well). A pull request to add a RocksDB backend (going to disk) will be merged soon [1]. Another backend using Flink's managed memory is planned.
Will having a state persistence have any impact on performance?  
 

2) I am not sure what you mean by trigger / schedule a delayed event, but have a few pointers that might be helpful:
- Flink can handle late arriving events. Check the event-time feature [2].
- Flink's window triggers can be used to schedule window computations [3]
- You can implement a custom source function that emits / triggers events.
My specific use case is firing an event (after a certain amount of time) based on some data computation that I'm performing on another stream. 
 

2016-02-03 5:39 GMT+01:00 Soumya Simanta <[hidden email]>:
I'm getting started with Flink and had a very fundamental doubt. 

1) Where does Flink capture/store intermediate state? 

For example, two streams of data have a common key. The streams can lag in time (second, hours or even days). My understanding is that Flink somehow needs to store the data from the first (faster) stream so that it can match and join the data with the second(slower) stream. 

2) Is there a mechanism to trigger/schedule a delayed event in Flink? 

Thanks
-Soumya