Unit Test for KeyedProcessFunction with out-of-core state

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit Test for KeyedProcessFunction with out-of-core state

Alexey Trenikhun
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Tzu-Li (Gordon) Tai
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Alexey Trenikhun
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai <[hidden email]>
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun <[hidden email]>
Cc: Flink User Mail List <[hidden email]>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Dawid Wysakowicz-2

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case for one. If you want to test specifically against RocksDB then you can use it in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai [hidden email]
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun [hidden email]
Cc: Flink User Mail List [hidden email]
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Arvid Heise-3
Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should rather go in an integration test.

You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will not limit the portability of your program to newer Flink version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case for one. If you want to test specifically against RocksDB then you can use it in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai [hidden email]
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun [hidden email]
Cc: Flink User Mail List [hidden email]
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Alexey Trenikhun
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?

Thanks,
Alexey


From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 4:26:47 AM
To: Dawid Wysakowicz <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should rather go in an integration test.

You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will not limit the portability of your program to newer Flink version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case for one. If you want to test specifically against RocksDB then you can use it in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai [hidden email]
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun [hidden email]
Cc: Flink User Mail List [hidden email]
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Arvid Heise-3
The new backend would be for unit tests (instead of a RocksDB mock). It's kind of the mock for out-of-core behavior that you initially requested.

To use rocksDB in an IT Case with multiple task managers, you would adjust the configuration in the usual minicluster setup, for example [1].

Note that you can do the same with the test harness [2], but I'd recommend the test harness only for testing new operators or complex ProcessFunctions (e.g., using timers) and not just for a map. Test harness is non-public API and we need to adjust it from time to time to reflect refactoring on the operators.



On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun <[hidden email]> wrote:
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?

Thanks,
Alexey


From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 4:26:47 AM
To: Dawid Wysakowicz <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should rather go in an integration test.

You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will not limit the portability of your program to newer Flink version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case for one. If you want to test specifically against RocksDB then you can use it in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai [hidden email]
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun [hidden email]
Cc: Flink User Mail List [hidden email]
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Unit Test for KeyedProcessFunction with out-of-core state

Alexey Trenikhun
Unfortunately it looks like impossible to change backend of AbstractStreamOperatorTestHarness without resorting to reflection, stateBackend  initialized in constructor as `this.stateBackend = new MemoryStateBackend();`, since it is protected, I can change it in derived class, but checkpointStorage already initialized using original backend `    this.checkpointStorage = this.stateBackend.createCheckpointStorage(new JobID());`, and checkpointStorage  is private

Thanks,
Alexey

From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 11:14 PM
To: Alexey Trenikhun <[hidden email]>
Cc: Dawid Wysakowicz <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
The new backend would be for unit tests (instead of a RocksDB mock). It's kind of the mock for out-of-core behavior that you initially requested.

To use rocksDB in an IT Case with multiple task managers, you would adjust the configuration in the usual minicluster setup, for example [1].

Note that you can do the same with the test harness [2], but I'd recommend the test harness only for testing new operators or complex ProcessFunctions (e.g., using timers) and not just for a map. Test harness is non-public API and we need to adjust it from time to time to reflect refactoring on the operators.



On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun <[hidden email]> wrote:
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?

Thanks,
Alexey


From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 4:26:47 AM
To: Dawid Wysakowicz <[hidden email]>
Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should rather go in an integration test.

You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will not limit the portability of your program to newer Flink version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case for one. If you want to test specifically against RocksDB then you can use it in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai [hidden email]
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun [hidden email]
Cc: Flink User Mail List [hidden email]
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state
 
Hi Alexey,

Is there a specific reason why you want to test against RocksDB? 

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng