OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Ken Krugler
Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Piotr Nowojski
Hi,

You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:

        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }

Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159

Piotrek

On 16 Aug 2018, at 00:24, Ken Krugler <[hidden email]> wrote:

Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Reply | Threaded
Open this post in threaded view
|

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Ken Krugler
Hi Piotr,

Thanks, and darn it that’s something I should have noticed.

— Ken


On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <[hidden email]> wrote:

Hi,

You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:

        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }

Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159

Piotrek

On 16 Aug 2018, at 00:24, Ken Krugler <[hidden email]> wrote:

Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: OneInputStreamOperatorTestHarness.snapshot doesn't include timers?

Piotr Nowojski
No problem :) You motivated me to do a fix for that, since I stumbled across this bug/issue myself before and also took me some time in the debugger to find the cause.

Piotrek

On 16 Aug 2018, at 20:05, Ken Krugler <[hidden email]> wrote:

Hi Piotr,

Thanks, and darn it that’s something I should have noticed.

— Ken


On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <[hidden email]> wrote:

Hi,

You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:

        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }

Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159

Piotrek

On 16 Aug 2018, at 00:24, Ken Krugler <[hidden email]> wrote:

Hi all,

It looks to me like the OperatorSubtaskState returned from OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had been registered via registerProcessingTimeTimer but had not yet fired when the snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and run.

The TimerFunction used in this test does seem to be doing the right thing, as using it in a simple job on a local Flink cluster works as expected when creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra