Writing a POJO Schema Evolution E2E test in Java

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing a POJO Schema Evolution E2E test in Java

Theo
Hi,

We have a pipeline which internally uses Java POJOs and also needs to keep some events entirely in state for some time.

From time to time, our POJOs evolve, like attributes are added or removed.

Now I wanted to write a E2E test that proves the schema migration works (Having different schemas in source kafka topic, flink pipeline state and sink) for bounded scenarios (attribute added or removed)

I figured out that in my test, I can instantiate a MiniClusterWithClientResource, receive a client, start a job over the client and also cancel the job with a savepoint. My idea was to start the job, put some records in, cancel with a savepoint and restart the job from savepoint, but with a slightly different POJO (added another attribute and removed an existing one).

Currently, I'm sadly missing two pieces:
1. I don't see a way to restart a job from savepoint via the client obtained from the MiniClusterWithClientResource in my test
2. According to a flink blog post [1],schema evolution of POJOs is more limited, especially the evolved POJO must have the same "nampesacpe" (i.e. java package?!) and class name.

Especially point 2 seems to make it impossible for me to automate testing of the evolution, but need to do it manually.

Do you have any idea how I could overcome these limitations so that I can build a proper end to end test for the schema migration to work?

Best regards
Theo

[1] https://flink.apache.org/news/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink.html
Reply | Threaded
Open this post in threaded view
|

Re: Writing a POJO Schema Evolution E2E test in Java

Tzu-Li (Gordon) Tai
Hi Theo,

This is indeed a tricky feature to test!

On Thu, Feb 20, 2020 at 8:59 PM Theo Diefenthal <[hidden email]> wrote:
Hi,

We have a pipeline which internally uses Java POJOs and also needs to keep some events entirely in state for some time.

From time to time, our POJOs evolve, like attributes are added or removed.

Now I wanted to write a E2E test that proves the schema migration works (Having different schemas in source kafka topic, flink pipeline state and sink) for bounded scenarios (attribute added or removed)

I figured out that in my test, I can instantiate a MiniClusterWithClientResource, receive a client, start a job over the client and also cancel the job with a savepoint. My idea was to start the job, put some records in, cancel with a savepoint and restart the job from savepoint, but with a slightly different POJO (added another attribute and removed an existing one).

Currently, I'm sadly missing two pieces:
1. I don't see a way to restart a job from savepoint via the client obtained from the MiniClusterWithClientResource in my test
2. According to a flink blog post [1],schema evolution of POJOs is more limited, especially the evolved POJO must have the same "nampesacpe" (i.e. java package?!) and class name.

The way this is sort of overcome by tests in Flink also surrounding schema / serializer evolution is to have two different classes (with different classnames) and reload it in new classloaders so that they can be "relocated" to have the same names at runtime.
In Flink, we use a `ClassRelocator` utility to do this. You can check out example usages of it in the `PojoSerializerUpgradeTest` and `TypeSerializerUpgradeTestBase`.

I'm not entirely sure if it would work in your scenario, but I think it's worth giving it a try since it'll make writing such tests easier.

If this doesn't work, then you could try doing it such that you have separate modules (i.e. jars) for the old / new Pojo definition, and then a separate module that does the actual test logic while loading the jars containing the old / new Pojos with different classloaders.
That would resemble what happens in reality more closely.


Especially point 2 seems to make it impossible for me to automate testing of the evolution, but need to do it manually.

Do you have any idea how I could overcome these limitations so that I can build a proper end to end test for the schema migration to work?

Best regards
Theo


Hope that helps! Would be great to hear back from you on how it works out.

Cheers,
GordonĀ