Periodic actions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Periodic actions

shikhar
I am trying to have my job also run a periodic action by using a custom source that emits a dummy element periodically and a sink that executes the callback, as shown in the code below. However as soon as I start the job and check the state in the JobManager UI this particular Sink->Source combo is in state 'FINISHED' I know based on logging that the sink never received any elements. What am I doing wrong?

```scala
    env
      .addSource(PeriodicSource(1.minutes))
      .addSink { _ => foo() }
```

```scala
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

import scala.concurrent.duration.FiniteDuration

case class PeriodicSource(interval: FiniteDuration) extends SourceFunction[Unit] {
  @volatile private var active = false

  override def run(ctx: SourceContext[Unit]): Unit = {
    while (active) {
      sleep()
      if (active) {
        ctx.getCheckpointLock
        ctx.collect(Unit)
      }
    }
  }

  override def cancel(): Unit = {
    active = false
  }

  private def sleep(): Unit = {
    val startTimeMs = System.currentTimeMillis()
    val desiredSleepMs = interval.toMillis
    do {
      Thread.sleep(math.min(desiredSleepMs, 100))
    } while (active && (System.currentTimeMillis() - startTimeMs) < desiredSleepMs)
  }
}
```
Reply | Threaded
Open this post in threaded view
|

Re: Periodic actions

Chesnay Schepler
could the problem be as simple as var active being never true?

On 04.03.2016 03:08, shikhar wrote:

> I am trying to have my job also run a periodic action by using a custom
> source that emits a dummy element periodically and a sink that executes the
> callback, as shown in the code below. However as soon as I start the job and
> check the state in the JobManager UI this particular Sink->Source combo is
> in state 'FINISHED' I know based on logging that the sink never received any
> elements. What am I doing wrong?
>
> ```scala
>      env
>        .addSource(PeriodicSource(1.minutes))
>        .addSink { _ => foo() }
> ```
>
> ```scala
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
>
> import scala.concurrent.duration.FiniteDuration
>
> case class PeriodicSource(interval: FiniteDuration) extends
> SourceFunction[Unit] {
>    @volatile private var active = false
>
>    override def run(ctx: SourceContext[Unit]): Unit = {
>      while (active) {
>        sleep()
>        if (active) {
>          ctx.getCheckpointLock
>          ctx.collect(Unit)
>        }
>      }
>    }
>
>    override def cancel(): Unit = {
>      active = false
>    }
>
>    private def sleep(): Unit = {
>      val startTimeMs = System.currentTimeMillis()
>      val desiredSleepMs = interval.toMillis
>      do {
>        Thread.sleep(math.min(desiredSleepMs, 100))
>      } while (active && (System.currentTimeMillis() - startTimeMs) <
> desiredSleepMs)
>    }
> }
> ```
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Periodic-actions-tp5290.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Periodic actions

shikhar
Wow that's embarassing :D That was indeed the issue