Hi Puneet,Can you explain it in more detail? Do you mean the job is finished before you call ctx.timeservice()?Maybe you have to let your source running for a longer time.It's better to show us the whole pipeline of your job. For example, write a sample code(or provide a git link) that can reproduce your problem easily.Best, HequnOn Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <[hidden email]> wrote:Hi hequanWeird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error
On Tuesday, January 8, 2019, Hequn Cheng <[hidden email]> wrote:Hi puneet,Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value?I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big).Best, HequnOn Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:Hi Puneet,
maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?
Regards,Timo
Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn
Its a streaming job .
On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,
The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2.
Best, Hequn
On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All
Facing some issue with context to onTimer method in processfunction
class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{
/****/private static final long serialVersionUID = 1L;
@Overridepublic void processElement(Tuple2<String, String> arg0,ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {// TODO Auto-generated method stublong parseLong = Long.parseLong(arg0.f1);TimerService timerService = ctx.timerService();ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);}
@Overridepublic void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,Collector<String> out) throws Exception {// TODO Auto-generated method stubsuper.onTimer(timestamp, ctx, out);System.out.println("Executing timmer"+timestamp);out.collect("Timer Testing..");}}--
--
--
--
Free forum by Nabble | Edit this page |