Hi,
I used this example of KeyedProcessFunction from the FLink website [1] and I have implemented my own KeyedProcessFunction to process some approximation counting [2]. This worked very well. Then I switched the data source to consume strings from Twitter [3]. The data source is consuming the strings because I can see it when I debug. However, the time comparison is always different on the onTimer() method, and I never get the results of the window processing. I don't know the exact reason that this is happening. I guess it is because my state is too heavy. But, still shouldn't the time be correct at some point to finish the evaluation of my window? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example [2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java [3] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java Kind Regards, Felipe |
Hi,
Isn’t your problem that the source is constantly emitting the data and bumping your timers? Keep in mind that the code that you are basing on has the following characteristic: > In the following example a KeyedProcessFunction maintains counts per key, and emits a key/count pair whenever a minute passes without an update for that key
Piotrek
|
I am sorry, I wanted to point this reference https://stackoverflow.com/a/47071833/2096986 which implements a window on a ProcessFunction in Flink. On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski <[hidden email]> wrote:
|
I achieved some enhancement based on [1]. My code is here [2]. Basically I am using "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" inside the processElement method to trigger the onTimer method. And when the onTimer method is triggered I clean the state using "hllStateTwitter.clear();". However, I still have a question. I set the time out to 5000 miliseconds and the onTimer method is triggered slightly different. Why is it happening? process: 1560850703025 - 1560850708025 onTimer: 1560850708025 - 1560850713017 = 4992 3> estimate cardinality: 544 process: 1560850709019 - 1560850714019 onTimer: 1560850714019 - 1560850718942 = 4923 3> estimate cardinality: 485 process: 1560850714027 - 1560850719027 onTimer: 1560850719027 - 1560850723936 = 4909 3> estimate cardinality: 438 process: 1560850719035 - 1560850724035 [1] https://stackoverflow.com/a/53646529/2096986 [2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java On Tue, Jun 18, 2019 at 11:15 AM Felipe Gutierrez <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |