I am trying to have my job also run a periodic action by using a custom source that emits a dummy element periodically and a sink that executes the callback, as shown in the code below. However as soon as I start the job and check the state in the JobManager UI this particular Sink->Source combo is in state 'FINISHED' I know based on logging that the sink never received any elements. What am I doing wrong?
```scala env .addSource(PeriodicSource(1.minutes)) .addSink { _ => foo() } ``` ```scala import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import scala.concurrent.duration.FiniteDuration case class PeriodicSource(interval: FiniteDuration) extends SourceFunction[Unit] { @volatile private var active = false override def run(ctx: SourceContext[Unit]): Unit = { while (active) { sleep() if (active) { ctx.getCheckpointLock ctx.collect(Unit) } } } override def cancel(): Unit = { active = false } private def sleep(): Unit = { val startTimeMs = System.currentTimeMillis() val desiredSleepMs = interval.toMillis do { Thread.sleep(math.min(desiredSleepMs, 100)) } while (active && (System.currentTimeMillis() - startTimeMs) < desiredSleepMs) } } ``` |
could the problem be as simple as var active being never true?
On 04.03.2016 03:08, shikhar wrote: > I am trying to have my job also run a periodic action by using a custom > source that emits a dummy element periodically and a sink that executes the > callback, as shown in the code below. However as soon as I start the job and > check the state in the JobManager UI this particular Sink->Source combo is > in state 'FINISHED' I know based on logging that the sink never received any > elements. What am I doing wrong? > > ```scala > env > .addSource(PeriodicSource(1.minutes)) > .addSink { _ => foo() } > ``` > > ```scala > import org.apache.flink.streaming.api.functions.source.SourceFunction > import > org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext > > import scala.concurrent.duration.FiniteDuration > > case class PeriodicSource(interval: FiniteDuration) extends > SourceFunction[Unit] { > @volatile private var active = false > > override def run(ctx: SourceContext[Unit]): Unit = { > while (active) { > sleep() > if (active) { > ctx.getCheckpointLock > ctx.collect(Unit) > } > } > } > > override def cancel(): Unit = { > active = false > } > > private def sleep(): Unit = { > val startTimeMs = System.currentTimeMillis() > val desiredSleepMs = interval.toMillis > do { > Thread.sleep(math.min(desiredSleepMs, 100)) > } while (active && (System.currentTimeMillis() - startTimeMs) < > desiredSleepMs) > } > } > ``` > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Free forum by Nabble | Edit this page |