schedule tasks `inside` Flink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

schedule tasks `inside` Flink

Michal Fijolek
Hello.
My app needs Map[K, V] as simple cache for business data, which needs to be invalidated periodically, lets say once per day. 
Right now I'm using rather naive approach which is 
trait Dictionary[K, V] extends Serializable {
@volatile private var cache: Map[K, V] = Map()
def lookup(id: K): Option[V] = cache.get(id)
private def fetchDictionary: Map[K, V] = ???
private def updateDictionary() = {
cache = fetchDictionary
}
val invalidate = new Runnable with Serializable {
override def run(): Unit = updateDictionary()
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay)
}
This seems wrong, because I guess I should do such thing `inside` Flink, and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. 
What will be idomatic Flink way to approach this problem? How can I schedule tasks and make Flink aware of them?

Thanks,
Michal
Reply | Threaded
Open this post in threaded view
|

Re: schedule tasks `inside` Flink

Fabian Hueske-2
Hi Michal,

If I got your requirements right, you could try to solve this issue by serving the updates through a regular DataStream.
You could add a SourceFunction which periodically emits a new version of the cache and a CoFlatMap operator which receives on the first input the regular streamed input and on the second input the cache updates. If the Flink job gets stopped, the update source will be canceled as a regular source.

You might also want to expose the cache as operator state to Flink to ensure it is checkpointed and restored in case of a failure.

Best, Fabian

2016-02-14 18:36 GMT+01:00 Michal Fijolek <[hidden email]>:
Hello.
My app needs Map[K, V] as simple cache for business data, which needs to be invalidated periodically, lets say once per day. 
Right now I'm using rather naive approach which is 
trait Dictionary[K, V] extends Serializable {
@volatile private var cache: Map[K, V] = Map()
def lookup(id: K): Option[V] = cache.get(id)
private def fetchDictionary: Map[K, V] = ???
private def updateDictionary() = {
cache = fetchDictionary
}
val invalidate = new Runnable with Serializable {
override def run(): Unit = updateDictionary()
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay)
}
This seems wrong, because I guess I should do such thing `inside` Flink, and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. 
What will be idomatic Flink way to approach this problem? How can I schedule tasks and make Flink aware of them?

Thanks,
Michal

Reply | Threaded
Open this post in threaded view
|

Re: schedule tasks `inside` Flink

Stephan Ewen
Fabian's suggestion with the co-map is good. You can use a "broadcast()" connect to make sure the dictionary gets to all nodes.

If you want full control about how and when to read  the data, a scheduled task is not that bad even as a solution. Make sure you implement this as a "RichFunction", so you can use "open()" to read the first set of data and "close()" to stop your threads.

As a related issue: We are looking into extensions to the API to explicitly support such "slow changing inputs" in a similar way as "broadcast variables" work in the DataSet API. 
This is the JIRA issue, if you post your use case there, you can make this part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514

Greetings,
Stephan






On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <[hidden email]> wrote:
Hi Michal,

If I got your requirements right, you could try to solve this issue by serving the updates through a regular DataStream.
You could add a SourceFunction which periodically emits a new version of the cache and a CoFlatMap operator which receives on the first input the regular streamed input and on the second input the cache updates. If the Flink job gets stopped, the update source will be canceled as a regular source.

You might also want to expose the cache as operator state to Flink to ensure it is checkpointed and restored in case of a failure.

Best, Fabian

2016-02-14 18:36 GMT+01:00 Michal Fijolek <[hidden email]>:
Hello.
My app needs Map[K, V] as simple cache for business data, which needs to be invalidated periodically, lets say once per day. 
Right now I'm using rather naive approach which is 
trait Dictionary[K, V] extends Serializable {
@volatile private var cache: Map[K, V] = Map()
def lookup(id: K): Option[V] = cache.get(id)
private def fetchDictionary: Map[K, V] = ???
private def updateDictionary() = {
cache = fetchDictionary
}
val invalidate = new Runnable with Serializable {
override def run(): Unit = updateDictionary()
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay)
}
This seems wrong, because I guess I should do such thing `inside` Flink, and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. 
What will be idomatic Flink way to approach this problem? How can I schedule tasks and make Flink aware of them?

Thanks,
Michal


Reply | Threaded
Open this post in threaded view
|

Re: schedule tasks `inside` Flink

Michal Fijolek
Thanks for help guys!
Eventually I did implemented it as a RichFunction using open() and closed() methods.

Michał

2016-02-25 19:00 GMT+01:00 Stephan Ewen <[hidden email]>:
Fabian's suggestion with the co-map is good. You can use a "broadcast()" connect to make sure the dictionary gets to all nodes.

If you want full control about how and when to read  the data, a scheduled task is not that bad even as a solution. Make sure you implement this as a "RichFunction", so you can use "open()" to read the first set of data and "close()" to stop your threads.

As a related issue: We are looking into extensions to the API to explicitly support such "slow changing inputs" in a similar way as "broadcast variables" work in the DataSet API. 
This is the JIRA issue, if you post your use case there, you can make this part of the discussion: https://issues.apache.org/jira/browse/FLINK-3514

Greetings,
Stephan






On Mon, Feb 15, 2016 at 12:33 PM, Fabian Hueske <[hidden email]> wrote:
Hi Michal,

If I got your requirements right, you could try to solve this issue by serving the updates through a regular DataStream.
You could add a SourceFunction which periodically emits a new version of the cache and a CoFlatMap operator which receives on the first input the regular streamed input and on the second input the cache updates. If the Flink job gets stopped, the update source will be canceled as a regular source.

You might also want to expose the cache as operator state to Flink to ensure it is checkpointed and restored in case of a failure.

Best, Fabian

2016-02-14 18:36 GMT+01:00 Michal Fijolek <[hidden email]>:
Hello.
My app needs Map[K, V] as simple cache for business data, which needs to be invalidated periodically, lets say once per day. 
Right now I'm using rather naive approach which is 
trait Dictionary[K, V] extends Serializable {
@volatile private var cache: Map[K, V] = Map()
def lookup(id: K): Option[V] = cache.get(id)
private def fetchDictionary: Map[K, V] = ???
private def updateDictionary() = {
cache = fetchDictionary
}
val invalidate = new Runnable with Serializable {
override def run(): Unit = updateDictionary()
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(invalidate, oncePerDay)
}
This seems wrong, because I guess I should do such thing `inside` Flink, and when I stop Flink job, nobody's gonna stop scheduled invalidation tasks. 
What will be idomatic Flink way to approach this problem? How can I schedule tasks and make Flink aware of them?

Thanks,
Michal