Hi All Facing some issue with context to onTimer method in processfunction class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{ /** * */ private static final long serialVersionUID = 1L; @Override public void processElement(Tuple2<String, String> arg0, ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception { // TODO Auto-generated method stub long parseLong = Long.parseLong(arg0.f1); TimerService timerService = ctx.timerService(); ctx.timerService().registerProcessingTimeTimer(parseLong + 5000); } @Override public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx, Collector<String> out) throws Exception { // TODO Auto-generated method stub super.onTimer(timestamp, ctx, out); System.out.println("Executing timmer"+timestamp); out.collect("Timer Testing.."); } } |
Hi Puneet, The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. Best, Hequn On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
|
Hi Hequn Its a streaming job . On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
|
Hi Puneet,
maybe you can show or explain us a bit
more about your pipeline. From what I see your ProcessFunction
looks correct. Are you sure the registering takes place?
Regards,
Timo
Am 07.01.19 um 14:15 schrieb Puneet
Kinra:
|
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function. On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
|
Hi puneet, Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value? I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big). Best, Hequn On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:
|
Hi hequan
Weird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error On Tuesday, January 8, 2019, Hequn Cheng <[hidden email]> wrote:
-- |
Hi Puneet, Can you explain it in more detail? Do you mean the job is finished before you call ctx.timeservice()? Maybe you have to let your source running for a longer time. It's better to show us the whole pipeline of your job. For example, write a sample code(or provide a git link) that can reproduce your problem easily. Best, Hequn On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <[hidden email]> wrote: Hi hequan |
Sure, I will do that. On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |