Dear all,
I want to clear some of my variables in KeyedBroadcastProcessFunction after a certain time. I implemented the onTimer() function but even though I am using ProcessingTime like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am getting null when ctx.timestamp() is called. How do I ensure that some of variables or states inside KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3 seconds)? Here is skeleton of what it looks like. I am using Flink 1.9 public static class myFunction extends KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, List<Integer>, List<String>>> { List<Point> fixedPoints; public void processBroadcastElement( Point myPoint, Context ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { /* put myPoint in broadcastState*/ } public void processElement(Point queryPoint, ReadOnlyContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { /* collect output*/ System.out.println("TimeStamp: " +ctx.timestamp()); //returns "Timestamp: null" ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000); //java.lang.NullPointerException } //does not run due to java.lang.NullPointerException public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { System.out.println("Clearing..."); fixedPoints.clear(); System.out.println("Clearing...COMPLETE"); } } When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); I get timestamps like these TimeStamp: 1573016104294TimeStamp: 1573016104289 TimeStamp: 1573016104292 however the onTimer() function is never called and fixedPoints is not cleared. My datastreams right now are very limited. keyedStream has 8 elements while broadcast stream has 7. I would really appreciate any help! Best Regards, Komal |
Hi Komal Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be null if your program is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want to fetch current processing time stamp,
please use `ctx# currentProcessingTime()`. Best Yun Tang From: Komal Mariam <[hidden email]> Dear all,
I want to clear some of my variables in
KeyedBroadcastProcessFunction
after a certain time. I implemented the
onTimer() function but even though I am using ProcessingTime like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime),
I am getting null
when ctx.timestamp() is called.
public static class myFunction extends KeyedBroadcastProcessFunction<String, Point, Point, Tuple3<Integer, List<Integer>, List<String>>> { List<Point> fixedPoints; public void processBroadcastElement( /* put myPoint in broadcastState*/ } public void processElement(Point queryPoint, ReadOnlyContext ctx, Collector<Tuple3<Integer, List<Integer>, List<String>>> out) throws Exception { /* collect output*/ System.out.println("TimeStamp: " +ctx.timestamp()); //returns "Timestamp: null" ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000); //java.lang.NullPointerException } //does not run due to java.lang.NullPointerException public void onTimer(long timestamp, } When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I get timestamps like these TimeStamp: 1573016104294 however the onTimer() function is never called and fixedPoints is not cleared. I would really appreciate any help! |
The ctx.currentProcessingTIme() escaped my notice.
Thank you for pointing it out Yun Tang. I now set my processing Timer using ctx.timerService().registerProcessingTimeTimer(ctx.currentProcessingTime()+2000); and it works. On Wed, 6 Nov 2019 at 21:57, Yun Tang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |