Apache Flink Operator State as Query Cache

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Flink Operator State as Query Cache

tambunanw
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Anwar Rizal

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Stephan Ewen
Hi!

In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.

Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.

We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.

Greetings,
Stephan


On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Hi Stephan, 

>Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
Is this using off the heap memory ? Which version we expect this one to be available ? 

Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?





On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.

Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.

We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.

Greetings,
Stephan


On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--




--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Aljoscha Krettek
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha

> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =  
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

rmetzger0
Hi Welly,
Flink 0.10.0 is out, its just not announced yet. 
Its available on maven central and the global mirrors are currently syncing it. This mirror for example has the update already: http://apache.mirror.digionline.de/flink/flink-0.10.0/

On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan <[hidden email]> wrote:
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Awesome !

This is really the best weekend gift ever. :)

Cheers

On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger <[hidden email]> wrote:
Hi Welly,
Flink 0.10.0 is out, its just not announced yet. 
Its available on maven central and the global mirrors are currently syncing it. This mirror for example has the update already: http://apache.mirror.digionline.de/flink/flink-0.10.0/

On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan <[hidden email]> wrote:
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--




--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Hi Robert, 

Is this version has already handle the stream perfection or out of order event ? 

Any resource on how this work and the API reference ? 
 

Cheers

On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan <[hidden email]> wrote:
Awesome !

This is really the best weekend gift ever. :)

Cheers

On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger <[hidden email]> wrote:
Hi Welly,
Flink 0.10.0 is out, its just not announced yet. 
Its available on maven central and the global mirrors are currently syncing it. This mirror for example has the update already: http://apache.mirror.digionline.de/flink/flink-0.10.0/

On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan <[hidden email]> wrote:
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--




--



--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Kostas Tzoumas
Hi Wally,

This version adds support for specifying and switching between time semantics - processing time, ingestion time, or event time. 

When working with event time, you can specify watermarks to track the progress of event time. So, even if events arrive out of order, windows will be specified on the event time (not arrival time), and the computation will be triggered on watermark arrival.


Is this what you are looking for?

Kostas


On Sat, Nov 14, 2015 at 1:54 AM, Welly Tambunan <[hidden email]> wrote:
Hi Robert, 

Is this version has already handle the stream perfection or out of order event ? 

Any resource on how this work and the API reference ? 
 

Cheers

On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan <[hidden email]> wrote:
Awesome !

This is really the best weekend gift ever. :)

Cheers

On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger <[hidden email]> wrote:
Hi Welly,
Flink 0.10.0 is out, its just not announced yet. 
Its available on maven central and the global mirrors are currently syncing it. This mirror for example has the update already: http://apache.mirror.digionline.de/flink/flink-0.10.0/

On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan <[hidden email]> wrote:
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--




--



--

Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Hi Kostas, 

Yes. Exactly. Thanks a lot for this one. 

That's really what we need !


Cheers

On Sun, Nov 15, 2015 at 8:53 PM, Kostas Tzoumas <[hidden email]> wrote:
Hi Wally,

This version adds support for specifying and switching between time semantics - processing time, ingestion time, or event time. 

When working with event time, you can specify watermarks to track the progress of event time. So, even if events arrive out of order, windows will be specified on the event time (not arrival time), and the computation will be triggered on watermark arrival.


Is this what you are looking for?

Kostas


On Sat, Nov 14, 2015 at 1:54 AM, Welly Tambunan <[hidden email]> wrote:
Hi Robert, 

Is this version has already handle the stream perfection or out of order event ? 

Any resource on how this work and the API reference ? 
 

Cheers

On Fri, Nov 13, 2015 at 4:00 PM, Welly Tambunan <[hidden email]> wrote:
Awesome !

This is really the best weekend gift ever. :)

Cheers

On Fri, Nov 13, 2015 at 3:54 PM, Robert Metzger <[hidden email]> wrote:
Hi Welly,
Flink 0.10.0 is out, its just not announced yet. 
Its available on maven central and the global mirrors are currently syncing it. This mirror for example has the update already: http://apache.mirror.digionline.de/flink/flink-0.10.0/

On Fri, Nov 13, 2015 at 9:50 AM, Welly Tambunan <[hidden email]> wrote:
Hi Aljoscha, 

Thanks for this one. Looking forward for 0.10 release version.  

Cheers

On Thu, Nov 12, 2015 at 5:34 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
I don’t know yet when the operator state will be transitioned to managed memory but it could happen for 1.0 (which will come after 0.10). The good thing is that the interfaces won’t change, so state can be used as it is now.

For 0.10, the release vote is winding down right now, so you can expect the release to happen today or tomorrow. I think the streaming is production ready now, we expect to mostly to hardening and some infrastructure changes (for example annotations that specify API stability) for the 1.0 release.

Let us know if you need more information.

Cheers,
Aljoscha
> On 12 Nov 2015, at 02:42, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> >Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
> Is this using off the heap memory ? Which version we expect this one to be available ?
>
> Another question is when will the release version of 0.10 will be out ? We would love to upgrade to that one when it's available. That version will be a production ready streaming right ?
>
>
>
>
>
> On Wed, Nov 11, 2015 at 4:49 PM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com




--




--



--




--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Anwar Rizal
In reply to this post by Stephan Ewen
Stephan,

Having a look at the brand new 0.10 release, I noticed that OperatorState is not implemented for ConnectedStream, which is quite the opposite of what you said below.

Or maybe I misunderstood your sentence here ?

Thanks,
Anwar.


On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.

Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.

We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.

Greetings,
Stephan


On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--


Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Stephan Ewen
Hi Anwar!

0.10.0 was feature frozen at that time already and under testing. Key/value state on connected streams will have to go into the next release...

Stephan


On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal <[hidden email]> wrote:
Stephan,

Having a look at the brand new 0.10 release, I noticed that OperatorState is not implemented for ConnectedStream, which is quite the opposite of what you said below.

Or maybe I misunderstood your sentence here ?

Thanks,
Anwar.


On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.

Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.

We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.

Greetings,
Stephan


On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--



Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

tambunanw
Hi Stephan, 

So that will be in Flink 1.0 right ?

Cheers

On Mon, Nov 16, 2015 at 9:06 PM, Stephan Ewen <[hidden email]> wrote:
Hi Anwar!

0.10.0 was feature frozen at that time already and under testing. Key/value state on connected streams will have to go into the next release...

Stephan


On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal <[hidden email]> wrote:
Stephan,

Having a look at the brand new 0.10 release, I noticed that OperatorState is not implemented for ConnectedStream, which is quite the opposite of what you said below.

Or maybe I misunderstood your sentence here ?

Thanks,
Anwar.


On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.

Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.

We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.

Greetings,
Stephan


On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
Thanks for the answer. 

Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function. 

Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one. 

I just want to gauge do i need to use memory cache or operator state would be just fine. 

However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ? 



On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:

Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
I'm not sure in which case Operator State is a good choice, but I think you can also live without.

val modelStream = .... // get the model stream
val dataStream   = 

modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.

Does it make sense to you ?

Anwar

On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
Hi All, 

We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db. 

So we want to use Apache Flink to do downsampling and cache the result for subsequent query. 

We are considering using Flink Operator state for that one. 

Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc. 

Any comments will be appreciated. 


Cheers
--




--






--
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Operator State as Query Cache

Aljoscha Krettek
Hi,
yes, it will be in 1.0. I’m working on adding it to master right now.

Cheers,
Aljoscha

> On 17 Nov 2015, at 02:46, Welly Tambunan <[hidden email]> wrote:
>
> Hi Stephan,
>
> So that will be in Flink 1.0 right ?
>
> Cheers
>
> On Mon, Nov 16, 2015 at 9:06 PM, Stephan Ewen <[hidden email]> wrote:
> Hi Anwar!
>
> 0.10.0 was feature frozen at that time already and under testing. Key/value state on connected streams will have to go into the next release...
>
> Stephan
>
>
> On Mon, Nov 16, 2015 at 3:00 PM, Anwar Rizal <[hidden email]> wrote:
> Stephan,
>
> Having a look at the brand new 0.10 release, I noticed that OperatorState is not implemented for ConnectedStream, which is quite the opposite of what you said below.
>
> Or maybe I misunderstood your sentence here ?
>
> Thanks,
> Anwar.
>
>
> On Wed, Nov 11, 2015 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
> Hi!
>
> In general, if you can keep state in Flink, you get better throughput/latency/consistency and have one less system to worry about (external k/v store). State outside means that the Flink processes can be slimmer and need fewer resources and as such recover a bit faster. There are use cases for that as well.
>
> Storing the model in OperatorState is a good idea, if you can. On the roadmap is to migrate the operator state to managed memory as well, so that should take care of the GC issues.
>
> We are just adding functionality to make the Key/Value operator state usable in CoMap/CoFlatMap as well (currently it only works in windows and in Map/FlatMap/Filter functions over the KeyedStream).
> Until the, you should be able to use a simple Java HashMap and use the "Checkpointed" interface to get it persistent.
>
> Greetings,
> Stephan
>
>
> On Sun, Nov 8, 2015 at 10:11 AM, Welly Tambunan <[hidden email]> wrote:
> Thanks for the answer.
>
> Currently the approach that i'm using right now is creating a base/marker interface to stream different type of message to the same operator. Not sure about the performance hit about this compare to the CoFlatMap function.
>
> Basically this one is providing query cache, so i'm thinking instead of using in memory cache like redis, ignite etc, i can just use operator state for this one.
>
> I just want to gauge do i need to use memory cache or operator state would be just fine.
>
> However i'm concern about the Gen 2 Garbage Collection for caching our own state without using operator state. Is there any clarification on that one ?
>
>
>
> On Sat, Nov 7, 2015 at 12:38 AM, Anwar Rizal <[hidden email]> wrote:
>
> Let me understand your case better here. You have a stream of model and stream of data. To process the data, you will need a way to access your model from the subsequent stream operations (map, filter, flatmap, ..).
> I'm not sure in which case Operator State is a good choice, but I think you can also live without.
>
> val modelStream = .... // get the model stream
> val dataStream   =  
>
> modelStream.broadcast.connect(dataStream). coFlatMap(  ) Then you can keep the latest model in a CoFlatMapRichFunction, not necessarily as Operator State, although maybe OperatorState is a good choice too.
>
> Does it make sense to you ?
>
> Anwar
>
> On Fri, Nov 6, 2015 at 10:21 AM, Welly Tambunan <[hidden email]> wrote:
> Hi All,
>
> We have a high density data that required a downsample. However this downsample model is very flexible based on the client device and user interaction. So it will be wasteful to precompute and store to db.
>
> So we want to use Apache Flink to do downsampling and cache the result for subsequent query.
>
> We are considering using Flink Operator state for that one.
>
> Is that the right approach to use that for memory cache ? Or if that preferable using memory cache like redis etc.
>
> Any comments will be appreciated.
>
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com
>
>
>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com