Implementing a tick service

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing a tick service

Benoît Paris-2
Hello all :)!

I'm having trouble creating a tick service.

Goal: register a TableSource that emits a Row roughly every 200ms in processing time. The Row would contain only one column "counter" that is incremented by 1 each Row.

Current attempt: Using TimerService
A TableSource with
public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.fromElements((Long) offset) // default 0L, one element
.keyBy(new NullByteKeySelector<>())
.process(new TickKeyedProcessFunction(200L))
.forceNonParallel();
}
And a KeyedProcessFunction with onTimer doing the heavy-lifting: 
public void processElement(Long value, Context context, Collector<Long> collector) throws IOException {
// called once
counter.update(value);
Long now = System.currentTimeMillis();
context.timerService().registerProcessingTimeTimer(now);
}
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
Long then = timestamp + interval;
Long current = counter.value();
current++;
counter.update(current);
ctx.timerService().registerProcessingTimeTimer(then);
out.collect(current);
}
Now, the runtime tells me the Source is in FINISHED status. So obviously there must be limitations around re-scheduling one key inside onTimer.

Is there a way to use the TimerService to go around that?
Also, how would you implement this tick service by other means?

Cheers
Ben


Reply | Threaded
Open this post in threaded view
|

Re: Implementing a tick service

Benoît Paris-2
Hello all!

Please disregard the last message; I used Thread.sleep() and Stateful Source Functions

But just out of curiosity, can processing-time Timers get rescheduled inside the onTimer method?






On Mon, Jan 20, 2020 at 7:04 PM Benoît Paris <[hidden email]> wrote:
Hello all :)!

I'm having trouble creating a tick service.

Goal: register a TableSource that emits a Row roughly every 200ms in processing time. The Row would contain only one column "counter" that is incremented by 1 each Row.

Current attempt: Using TimerService
A TableSource with
public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.fromElements((Long) offset) // default 0L, one element
.keyBy(new NullByteKeySelector<>())
.process(new TickKeyedProcessFunction(200L))
.forceNonParallel();
}
And a KeyedProcessFunction with onTimer doing the heavy-lifting: 
public void processElement(Long value, Context context, Collector<Long> collector) throws IOException {
// called once
counter.update(value);
Long now = System.currentTimeMillis();
context.timerService().registerProcessingTimeTimer(now);
}
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
Long then = timestamp + interval;
Long current = counter.value();
current++;
counter.update(current);
ctx.timerService().registerProcessingTimeTimer(then);
out.collect(current);
}
Now, the runtime tells me the Source is in FINISHED status. So obviously there must be limitations around re-scheduling one key inside onTimer.

Is there a way to use the TimerService to go around that?
Also, how would you implement this tick service by other means?

Cheers
Ben




--
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00   
http://benoit.paris
http://explicable.ml
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a tick service

Dominik Wosiński
Hey, 
you have access to context in `onTimer` so You can easily reschedule the timer when it is fired.

Best,
Dom.