onTimer function is not getting executed and job is marked as finished.

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

onTimer function is not getting executed and job is marked as finished.

Puneet Kinra-2
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Hequn Cheng
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Puneet Kinra-2
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Timo Walther
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Puneet Kinra-2
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Hequn Cheng
Hi puneet,

Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Puneet Kinra-2
Hi hequan

Weird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error

On Tuesday, January 8, 2019, Hequn Cheng <[hidden email]> wrote:
Hi puneet,

Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Hequn Cheng
Hi Puneet,

Can you explain it in more detail? Do you mean the job is finished before you call ctx.timeservice()? 
Maybe you have to let your source running for a longer time.

It's better to show us the whole pipeline of your job. For example, write a sample code(or provide a git link) that can reproduce your problem easily.

Best, Hequn


On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <[hidden email]> wrote:
Hi hequan

Weird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error

On Tuesday, January 8, 2019, Hequn Cheng <[hidden email]> wrote:
Hi puneet,

Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: onTimer function is not getting executed and job is marked as finished.

Puneet Kinra-2
Sure, I will do that. 

On Tue, Jan 8, 2019 at 7:25 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

Can you explain it in more detail? Do you mean the job is finished before you call ctx.timeservice()? 
Maybe you have to let your source running for a longer time.

It's better to show us the whole pipeline of your job. For example, write a sample code(or provide a git link) that can reproduce your problem easily.

Best, Hequn


On Tue, Jan 8, 2019 at 11:44 AM Puneet Kinra <[hidden email]> wrote:
Hi hequan

Weird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error

On Tuesday, January 8, 2019, Hequn Cheng <[hidden email]> wrote:
Hi puneet,

Could you print `parseLong + 5000` and `ctx.timerService().currentProcessingTime()` out and check the value?
I know it is a streaming program. What I mean is the timer you have registered is not within the interval of your job, so the timer has not been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = 100000000000(very big).

Best, Hequn


On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <[hidden email]> wrote:
I checked the same the function is getting exited when i am calling ctx.getTimeservice () function.

On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <[hidden email]> wrote:
Hi Puneet,

maybe you can show or explain us a bit more about your pipeline. From what I see your ProcessFunction looks correct. Are you sure the registering takes place?

Regards,
Timo

Am 07.01.19 um 14:15 schrieb Puneet Kinra:
Hi Hequn 

Its a streaming job . 

On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <[hidden email]> wrote:
Hi Puneet,

The value of the registered timer should within startTime and endTime of your job. For example, job starts at processing time t1 and stops at processing time t2. You have to make sure t1< `parseLong + 5000` < t2. 

Best, Hequn

On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra <[hidden email]> wrote:
Hi All

Facing some issue with context to onTimer method in processfunction

class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{

/**
*/
private static final long serialVersionUID = 1L;

@Override
public void processElement(Tuple2<String, String> arg0,
ProcessFunction<Tuple2<String, String>, String>.Context ctx, Collector<String> arg2) throws Exception {
// TODO Auto-generated method stub
long parseLong = Long.parseLong(arg0.f1);
TimerService timerService = ctx.timerService();
ctx.timerService().registerProcessingTimeTimer(parseLong + 5000);
}

@Override
public void onTimer(long timestamp, ProcessFunction<Tuple2<String, String>, String>.OnTimerContext ctx,
Collector<String> out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Executing timmer"+timestamp);
out.collect("Timer Testing..");
}
}

--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]




--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]