Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state (like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?
Thanks,
Alexey
|
Hi Alexey, Is there a specific reason why you want to test against RocksDB? Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend. As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc. Does this provide what you are looking for? Cheers, Gordon [1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
|
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);
Thanks,
Alexey
From: Tzu-Li (Gordon) Tai <[hidden email]>
Sent: Friday, September 4, 2020 12:35:48 AM To: Alexey Trenikhun <[hidden email]> Cc: Flink User Mail List <[hidden email]> Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state Hi Alexey,
Is there a specific reason why you want to test against RocksDB? Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the state backend. As a unit test, this should be sufficient for you to implement basic test scenarios for your function, such as expected output given inputs, state etc. Does this provide what you are looking for? Cheers, Gordon [1] https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun <[hidden email]> wrote:
|
Hi Alexey, There is no mock for RocksDB. Moreover I am not sure what would
be the use case for one. If you want to test specifically against
RocksDB then you can use it in the test harness Gordon mentioned. On 04/09/2020 16:31, Alexey Trenikhun
wrote:
signature.asc (849 bytes) Download Attachment |
Hi Alexey, Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests: * Unit test should really just be about business logic * If it's about specific implementation details of other components, it should rather go in an integration test. You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will not limit the portability of your program to newer Flink version (which mocking of components usually do). Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval. On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?
Thanks,
Alexey
From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 4:26:47 AM To: Dawid Wysakowicz <[hidden email]> Cc: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state Hi Alexey,
Definition of test levels are always a bit blurry when writing tests for a data processing framework, but I'm convinced that in your case, you should rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should rather go in an integration test.
You can still structure your code that only half of the pipeline or even just one step is executed in an ITCase, but it's much harder to do all the mocking than simply executing a small Flink program with a local runner. ITCases are really fast and will
not limit the portability of your program to newer Flink version (which mocking of components usually do).
Another idea for your specific use case would be to implement a backend that delegates to HeapMemory but copies all values on retrieval.
On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
The new backend would be for unit tests (instead of a RocksDB mock). It's kind of the mock for out-of-core behavior that you initially requested. To use rocksDB in an IT Case with multiple task managers, you would adjust the configuration in the usual minicluster setup, for example [1]. Note that you can do the same with the test harness [2], but I'd recommend the test harness only for testing new operators or complex ProcessFunctions (e.g., using timers) and not just for a map. Test harness is non-public API and we need to adjust it from time to time to reflect refactoring on the operators. On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Unfortunately it looks like impossible to change backend of AbstractStreamOperatorTestHarness without resorting to reflection,
stateBackend initialized in constructor as `this.stateBackend = new MemoryStateBackend();`, since it is protected, I can change it in derived class, but checkpointStorage already initialized using original
backend ` this.checkpointStorage = this.stateBackend.createCheckpointStorage(new JobID());`, and checkpointStorage
is private
Thanks,
Alexey
From: Arvid Heise <[hidden email]>
Sent: Monday, September 14, 2020 11:14 PM To: Alexey Trenikhun <[hidden email]> Cc: Dawid Wysakowicz <[hidden email]>; Flink User Mail List <[hidden email]> Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state The new backend would be for unit tests (instead of a RocksDB mock). It's kind of the mock for out-of-core behavior that you initially requested.
To use rocksDB in an IT Case with multiple task managers, you would adjust the configuration in the usual minicluster setup, for example [1].
Note that you can do the same with the test harness [2], but I'd recommend the test harness only for testing new operators or complex ProcessFunctions (e.g., using timers) and not just for a map. Test harness is non-public API and we need to adjust it
from time to time to reflect refactoring on the operators.
On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |