Transition Flink job from Java to Scala with state migration

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Transition Flink job from Java to Scala with state migration

Daksh Talwar
Hello,

We run a Stream API based Flink application on 1.10.0, coded in Java.
While moving this job to Scala (reasons unrelated to Flink/this application), we are getting the following error when trying to instantiate Scala job from a savepoint taken in the Java job:

org.apache.flink.util.StateMigrationException: The new state typeSerializer for operator state must not be incompatible.

We can see that this error occurs while instantiating a ValueState<T> state in our KeyedProcessFunction.
The KeyedProcessFunction instantiates two ValueState objects - ValueState<Long> and ValueState<MyCustomObject>. Our hunch is that it's the latter facing issues while loading from Savepoint.

We have ensured that the KeyedProcessFunction has the same id in both the jobs; and the new job is run with `--allowNonRestoredState` flag to discard state for other operators.

Has anyone in the community attempted this before, and faced such issues or succeeded?

Cheers,
Daksh

-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Transition Flink job from Java to Scala with state migration

Andrey Zagrebin-5
Hi Daksh,

You need to find which type causes the problem: Long, MyCustomObject or maybe something else. You could share the logs with full exception stack trace.
My guess is that your scala code uses another serializer for the failing type. See also docs to understand serialization in Flink [1]
When you restore the state after changing the job, the new serializer, used for the state type, should be compatible with the serializer, used to store the state in the previous version of the job [2].

I also cc Gordon who could have more ideas about the problem.

Best,

On Tue, Aug 25, 2020 at 4:43 PM Daksh Talwar <[hidden email]> wrote:
Hello,

We run a Stream API based Flink application on 1.10.0, coded in Java.
While moving this job to Scala (reasons unrelated to Flink/this application), we are getting the following error when trying to instantiate Scala job from a savepoint taken in the Java job:

org.apache.flink.util.StateMigrationException: The new state typeSerializer for operator state must not be incompatible.

We can see that this error occurs while instantiating a ValueState<T> state in our KeyedProcessFunction.
The KeyedProcessFunction instantiates two ValueState objects - ValueState<Long> and ValueState<MyCustomObject>. Our hunch is that it's the latter facing issues while loading from Savepoint.

We have ensured that the KeyedProcessFunction has the same id in both the jobs; and the new job is run with `--allowNonRestoredState` flag to discard state for other operators.

Has anyone in the community attempted this before, and faced such issues or succeeded?

Cheers,
Daksh

-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------

Reply | Threaded
Open this post in threaded view
|

Re: Transition Flink job from Java to Scala with state migration

Daksh Talwar
Thanks for the pointers Andrey, they helped us zero in on the issue.
We found that it was a bug in our Scala code that caused Flink to use Kryo serializer to read Avro-based state.
We've fixed it, and are able to evolve state-schema seamlessly now.

Cheers,
Daksh


On Tue, Aug 25, 2020 at 9:53 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Daksh,

You need to find which type causes the problem: Long, MyCustomObject or maybe something else. You could share the logs with full exception stack trace.
My guess is that your scala code uses another serializer for the failing type. See also docs to understand serialization in Flink [1]
When you restore the state after changing the job, the new serializer, used for the state type, should be compatible with the serializer, used to store the state in the previous version of the job [2].

I also cc Gordon who could have more ideas about the problem.

Best,

On Tue, Aug 25, 2020 at 4:43 PM Daksh Talwar <[hidden email]> wrote:
Hello,

We run a Stream API based Flink application on 1.10.0, coded in Java.
While moving this job to Scala (reasons unrelated to Flink/this application), we are getting the following error when trying to instantiate Scala job from a savepoint taken in the Java job:

org.apache.flink.util.StateMigrationException: The new state typeSerializer for operator state must not be incompatible.

We can see that this error occurs while instantiating a ValueState<T> state in our KeyedProcessFunction.
The KeyedProcessFunction instantiates two ValueState objects - ValueState<Long> and ValueState<MyCustomObject>. Our hunch is that it's the latter facing issues while loading from Savepoint.

We have ensured that the KeyedProcessFunction has the same id in both the jobs; and the new job is run with `--allowNonRestoredState` flag to discard state for other operators.

Has anyone in the community attempted this before, and faced such issues or succeeded?

Cheers,
Daksh

-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------


-----------------------------------------------------------------------------------------

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error, please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute or copy this email. Please notify the sender immediately by email if you have received this email by mistake and delete this email from your system. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

 

Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the organization. Any information on shares, debentures or similar instruments, recommended product pricing, valuations and the like are for information purposes only. It is not meant to be an instruction or recommendation, as the case may be, to buy or to sell securities, products, services nor an offer to buy or sell securities, products or services unless specifically stated to be so on behalf of the Flipkart group. Employees of the Flipkart group of companies are expressly required not to make defamatory statements and not to infringe or authorise any infringement of copyright or any other legal right by email communications. Any such communication is contrary to organizational policy and outside the scope of the employment of the individual concerned. The organization will not accept any liability in respect of such communication, and the employee responsible will be personally liable for any damages or other liability arising.

 

Our organization accepts no liability for the content of this email, or for the consequences of any actions taken on the basis of the information provided, unless that information is subsequently confirmed in writing. If you are not the intended recipient, you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

-----------------------------------------------------------------------------------------