Unit Testing State Stores in KeyedProcessFunctions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit Testing State Stores in KeyedProcessFunctions

Rion Williams
Hi all!

Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}


Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>


Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
Reply | Threaded
Open this post in threaded view
|

Re: Unit Testing State Stores in KeyedProcessFunctions

Chesnay Schepler
I do not believe this to be possible.

Given that the state will likely in some form affect the behavior of the function (usually in regards to what it outputs), it may be a better idea to test for that. (I suppose you'd want tests like that anyway)

On 3/3/2021 8:10 PM, Rion Williams wrote:
Hi all!

Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}


Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>


Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html


Reply | Threaded
Open this post in threaded view
|

Re: Unit Testing State Stores in KeyedProcessFunctions

Rion Williams
Thanks Chesnay,

I agree that output testing is more practical and far less brittle, I was just curious if support was there for it. I have a specific use case where I’m managing my own windows and may schedule something to be emitted but after some processing time delay so it could potentially be valuable to see this scheduling in state since it may not directly coincide with output.

Not a huge deal, I already have tests in place that function as black boxes with output verification, it was more of a question if it was supported.

Thanks much,

Rion

On Mar 3, 2021, at 2:44 PM, Chesnay Schepler <[hidden email]> wrote:


I do not believe this to be possible.

Given that the state will likely in some form affect the behavior of the function (usually in regards to what it outputs), it may be a better idea to test for that. (I suppose you'd want tests like that anyway)

On 3/3/2021 8:10 PM, Rion Williams wrote:
Hi all!

Is it possible to apply assertions against the underlying state stores within a KeyedProcessFunction using the existing KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I wanted to ensure that if I passed in two elements each with unique keys that I would be able to query the underlying state stores to ensure they were working as expected. I don’t really see a mechanism that would support such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate watermarks`() {
    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}


Is something like this possible or is there a better way to access the underlying state store? It seemed to work as expected when only a single key was involved, but when multiple keys were involved, things seemed to fall apart. The current testing documentation [0] is fantastic, however I think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState<Long>
@Transient private lateinit var scheduledEvictions: MapState<Long, Long>


Any recommendations or advice would be greatly appreciated and I'll be happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html