ctx.timestamp() returning null when using Processing Time

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

ctx.timestamp() returning null when using Processing Time

Komal Mariam
Dear all, 

I want to clear some of my variables in KeyedBroadcastProcessFunction after a certain time. I implemented the onTimer() function but even though I am using ProcessingTime
like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am getting null when ctx.timestamp() is called.

How do I ensure that some of variables or states inside KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3 seconds)?

Here is skeleton of what it looks like. I am using Flink 1.9


public static class myFunction extends KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, List<Integer>, List<String>>> {

List<Point> fixedPoints;

public void processBroadcastElement(
                Point myPoint,
                Context ctx,
                Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { 
/* put myPoint in broadcastState*/
}


 public void processElement(Point queryPoint, ReadOnlyContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception {

/* collect output*/
 
System.out.println("TimeStamp: " +ctx.timestamp());                   //returns "Timestamp: null"
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);      //java.lang.NullPointerException 

}

//does not run due to java.lang.NullPointerException

        public void onTimer(long timestamp,
                            OnTimerContext ctx,
                            Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception
        {
                System.out.println("Clearing...");           
                fixedPoints.clear();
                System.out.println("Clearing...COMPLETE");
    }

}

When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I get timestamps like these  
TimeStamp: 1573016104289
TimeStamp: 1573016104294
TimeStamp: 1573016104292

however the onTimer() function is never called and fixedPoints is not cleared. 

My datastreams right now are very limited. keyedStream has 8 elements while broadcast stream has 7. 

I would really appreciate any help!

Best Regards,
Komal

Reply | Threaded
Open this post in threaded view
|

Re: ctx.timestamp() returning null when using Processing Time

Yun Tang

Hi Komal

 

Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be null if your program  is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want to fetch current processing time stamp, please use `ctx# currentProcessingTime()`.

 

[1] https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50

Best

Yun Tang

 

From: Komal Mariam <[hidden email]>
Date: Wednesday, November 6, 2019 at 6:19 PM
To: user <[hidden email]>
Subject: ctx.timestamp() returning null when using Processing Time

 

Dear all, 

 

I want to clear some of my variables in KeyedBroadcastProcessFunction after a certain time. I implemented the onTimer() function but even though I am using ProcessingTime

like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am getting null when ctx.timestamp() is called.

How do I ensure that some of variables or states inside KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3 seconds)?


Here is skeleton of what it looks like. I am using Flink 1.9

 

 

public static class myFunction extends KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, List<Integer>, List<String>>> {

 

List<Point> fixedPoints;

 

public void processBroadcastElement(
                Point myPoint,
                Context ctx,
                Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { 

/* put myPoint in broadcastState*/

}

 

 

 public void processElement(Point queryPoint, ReadOnlyContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception {

 

/* collect output*/

 

System.out.println("TimeStamp: " +ctx.timestamp());                   //returns "Timestamp: null"

ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);      //java.lang.NullPointerException 

 

}

 

//does not run due to java.lang.NullPointerException

 

        public void onTimer(long timestamp,
                            OnTimerContext ctx,
                            Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception
        {
                System.out.println("Clearing...");           
                fixedPoints.clear();
                System.out.println("Clearing...COMPLETE");
    }

 

}

 

When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I get timestamps like these  
TimeStamp: 1573016104289

TimeStamp: 1573016104294
TimeStamp: 1573016104292

 

however the onTimer() function is never called and fixedPoints is not cleared. 

My datastreams right now are very limited. keyedStream has 8 elements while broadcast stream has 7. 

 

I would really appreciate any help!

Best Regards,
Komal

 

Reply | Threaded
Open this post in threaded view
|

Re: ctx.timestamp() returning null when using Processing Time

Komal Mariam
The ctx.currentProcessingTIme() escaped my notice.  Thank you for pointing it out Yun Tang.

I now set my processing Timer using ctx.timerService().registerProcessingTimeTimer(ctx.currentProcessingTime()+2000); and it works. 

On Wed, 6 Nov 2019 at 21:57, Yun Tang <[hidden email]> wrote:

Hi Komal

 

Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be null if your program  is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want to fetch current processing time stamp, please use `ctx# currentProcessingTime()`.

 

[1] https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50

Best

Yun Tang

 

From: Komal Mariam <[hidden email]>
Date: Wednesday, November 6, 2019 at 6:19 PM
To: user <[hidden email]>
Subject: ctx.timestamp() returning null when using Processing Time

 

Dear all, 

 

I want to clear some of my variables in KeyedBroadcastProcessFunction after a certain time. I implemented the onTimer() function but even though I am using ProcessingTime

like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am getting null when ctx.timestamp() is called.

How do I ensure that some of variables or states inside KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3 seconds)?


Here is skeleton of what it looks like. I am using Flink 1.9

 

 

public static class myFunction extends KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, List<Integer>, List<String>>> {

 

List<Point> fixedPoints;

 

public void processBroadcastElement(
                Point myPoint,
                Context ctx,
                Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { 

/* put myPoint in broadcastState*/

}

 

 

 public void processElement(Point queryPoint, ReadOnlyContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception {

 

/* collect output*/

 

System.out.println("TimeStamp: " +ctx.timestamp());                   //returns "Timestamp: null"

ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);      //java.lang.NullPointerException 

 

}

 

//does not run due to java.lang.NullPointerException

 

        public void onTimer(long timestamp,
                            OnTimerContext ctx,
                            Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception
        {
                System.out.println("Clearing...");           
                fixedPoints.clear();
                System.out.println("Clearing...COMPLETE");
    }

 

}

 

When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I get timestamps like these  
TimeStamp: 1573016104289

TimeStamp: 1573016104294
TimeStamp: 1573016104292

 

however the onTimer() function is never called and fixedPoints is not cleared. 

My datastreams right now are very limited. keyedStream has 8 elements while broadcast stream has 7. 

 

I would really appreciate any help!

Best Regards,
Komal