Adding keyed state to test harness before calling process function.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Adding keyed state to test harness before calling process function.

Marco Villalobos-2
Hi,

I would like to adding keyed state to test harness before calling process function.

I am using the OneInputStreamOperatorTestHarness.

I can't find any examples online on how to do that, and I am struggling to figure this out.

Can somebody please provide guidance?  My test case has keyed state pre-populated as one of its pre-conditions.

Thank you.  Sincerely,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: Adding keyed state to test harness before calling process function.

Guowei Ma
Hi, Macro
I think you could look at testScalingUp() at flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Best,
Guowei


On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos <[hidden email]> wrote:
Hi,

I would like to adding keyed state to test harness before calling process function.

I am using the OneInputStreamOperatorTestHarness.

I can't find any examples online on how to do that, and I am struggling to figure this out.

Can somebody please provide guidance?  My test case has keyed state pre-populated as one of its pre-conditions.

Thank you.  Sincerely,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: Adding keyed state to test harness before calling process function.

Marco Villalobos-2
Thank you.. I looked into that, but that does not initialize any values in keyed state, instead, it using key state, and lines 407-412 show that is not setting key state values in advanced, handling null values when it is not set in advance.

public void processElement(String value, Context ctx, Collector<Integer> out) throws Exception {
Integer oldCount = counterState.value();
Integer newCount = oldCount != null ? oldCount + 1 : 1;
counterState.update(newCount);
out.collect(newCount);
}

What I mean by initialize keyed state, is that I want to call processElement with values already existing in it.

On Thu, Nov 12, 2020 at 7:52 PM Guowei Ma <[hidden email]> wrote:
Hi, Macro
I think you could look at testScalingUp() at flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Best,
Guowei


On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos <[hidden email]> wrote:
Hi,

I would like to adding keyed state to test harness before calling process function.

I am using the OneInputStreamOperatorTestHarness.

I can't find any examples online on how to do that, and I am struggling to figure this out.

Can somebody please provide guidance?  My test case has keyed state pre-populated as one of its pre-conditions.

Thank you.  Sincerely,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: Adding keyed state to test harness before calling process function.

Guowei Ma
Hi, Macro
I do not find the direct way for doing this.(Maybe other guys know)

A possible way might  be 
1. Build the expected keyed state and get the `OperatorSubtaskState` from an `xxOperatorTestHarness`.
2. Use the `OperatorSubtaskState` to initialize the `xxOperatorTestHarness` that is needed to be tested.

Best,
Guowei


On Fri, Nov 13, 2020 at 12:43 PM Marco Villalobos <[hidden email]> wrote:
Thank you.. I looked into that, but that does not initialize any values in keyed state, instead, it using key state, and lines 407-412 show that is not setting key state values in advanced, handling null values when it is not set in advance.

public void processElement(String value, Context ctx, Collector<Integer> out) throws Exception {
Integer oldCount = counterState.value();
Integer newCount = oldCount != null ? oldCount + 1 : 1;
counterState.update(newCount);
out.collect(newCount);
}

What I mean by initialize keyed state, is that I want to call processElement with values already existing in it.

On Thu, Nov 12, 2020 at 7:52 PM Guowei Ma <[hidden email]> wrote:
Hi, Macro
I think you could look at testScalingUp() at flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
Best,
Guowei


On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos <[hidden email]> wrote:
Hi,

I would like to adding keyed state to test harness before calling process function.

I am using the OneInputStreamOperatorTestHarness.

I can't find any examples online on how to do that, and I am struggling to figure this out.

Can somebody please provide guidance?  My test case has keyed state pre-populated as one of its pre-conditions.

Thank you.  Sincerely,

Marco