Periodic actions
Posted by shikhar on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Periodic-actions-tp5290.html
I am trying to have my job also run a periodic action by using a custom source that emits a dummy element periodically and a sink that executes the callback, as shown in the code below. However as soon as I start the job and check the state in the JobManager UI this particular Sink->Source combo is in state 'FINISHED' I know based on logging that the sink never received any elements. What am I doing wrong?
```scala
env
.addSource(PeriodicSource(1.minutes))
.addSink { _ => foo() }
```
```scala
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import scala.concurrent.duration.FiniteDuration
case class PeriodicSource(interval: FiniteDuration) extends SourceFunction[Unit] {
@volatile private var active = false
override def run(ctx: SourceContext[Unit]): Unit = {
while (active) {
sleep()
if (active) {
ctx.getCheckpointLock
ctx.collect(Unit)
}
}
}
override def cancel(): Unit = {
active = false
}
private def sleep(): Unit = {
val startTimeMs = System.currentTimeMillis()
val desiredSleepMs = interval.toMillis
do {
Thread.sleep(math.min(desiredSleepMs, 100))
} while (active && (System.currentTimeMillis() - startTimeMs) < desiredSleepMs)
}
}
```