ProcessFunction example

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

ProcessFunction example

Philippe CAPARROY
I think there is an error in the code snippet describing the ProcessFunction time out example :  https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html


@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified) {
            // emit the state
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
If, as stated in the example, the CountWithTimeoutFunction should emit a key/count if no further update occurred during the  minute elapsed since last update, the test should be : 

if (timestamp == result.lastModified + 60000) { 
	// emit the state on timeout 
	out.collect(new Tuple2<String, Long>(result.key, result.count)); 
}

As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer method is the timestamp of the firing timer.






Reply | Threaded
Open this post in threaded view
|

Re: ProcessFunction example

Kostas Kloudas
Hi Philippe,

You are right! 
Thanks for reporting it!
We will fix it asap.

Kostas

On Mar 9, 2017, at 8:38 AM, Philippe Caparroy <[hidden email]> wrote:

I think there is an error in the code snippet describing the ProcessFunction time out example :  https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html


@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified) {
            // emit the state
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
If, as stated in the example, the CountWithTimeoutFunction should emit a key/count if no further update occurred during the  minute elapsed since last update, the test should be : 

if (timestamp == result.lastModified + 60000) { 
	// emit the state on timeout 
	out.collect(new Tuple2<String, Long>(result.key, result.count)); 
}

As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer method is the timestamp of the firing timer.







Reply | Threaded
Open this post in threaded view
|

Re: ProcessFunction example

Mauro Cortellazzi

Hi, i've created a PR to fix scala and java examples and the error suggested by Philippe.

Hope it will be helpful!!

Mauro


Il 09/03/2017 10:30, Kostas Kloudas ha scritto:
Hi Philippe,

You are right! 
Thanks for reporting it!
We will fix it asap.

Kostas

On Mar 9, 2017, at 8:38 AM, Philippe Caparroy <[hidden email]> wrote:

I think there is an error in the code snippet describing the ProcessFunction time out example :  https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html


@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified) {
            // emit the state
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
If, as stated in the example, the CountWithTimeoutFunction should emit a key/count if no further update occurred during the  minute elapsed since last update, the test should be : 
if (timestamp == result.lastModified + 60000) { 
	// emit the state on timeout 
	out.collect(new Tuple2<String, Long>(result.key, result.count)); 
}

As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer method is the timestamp of the firing timer.