Hi Izual, Thanks for reporting this! I'm also forwarding this to the user mailing list, as that is the more suitable place for this question. I think the usability of the State Processor API in Scala is indeed something that hasn’t been looked at closely yet. On Tue, Jan 21, 2020 at 8:12 AM izual <[hidden email]> wrote: Hi community, This part is explainable. The "magic-implicits" actually happen in the DataStream Scala API. Any primitive Scala types will inferred and serialized as their Java counterparts. AFAIK, this would not happen in the State Processor API yet and therefore why you are getting the StateMigrationException. When using Scala types directly with the State Processor API, I would guess that Kryo (as a generic fallback) was being used to access state. This can probably be confirmed by looking at the exception stack trace. Can you post a full copy of that? This should be resolvable by properly supporting Scala for the State Processor API, but it's just that up to this point, we didn't have a plan for that yet. Can you open a JIRA for this? I think it'll be a reasonable extension to the API.
I'm not sure what you mean here. Where is this keyBy happening? In the Scala DataStream job, or the State Processor API?
Cheers, Gordon |
Sry for wrong post.
> This can probably be confirmed by looking at the exception stack trace. > Can you post a full copy of that? I missed the history jobs, but I think u r right. When I debug the program to find reason, came into these code snippet. ``` TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } ``` I remember one is `org.apache.flink.api.common.typeutils.base.LongSerializer$LongSerializerSnapshot`, another is just `Kryo`. > Can you open a JIRA for this? I think it'll be a reasonable extension to > the API. I'll do that, 3q. > I'm not sure what you mean here. Where is this keyBy happening? In the > Scala DataStream job, or the State Processor API? In the Scala DataStream job, same with the examples of link-1 in the origial post。 I change keyBy(_._1) to keyBy(0), then the program will throw an exception. The full copy from job Exceptions: ``` java.io.IOException: Failed to restore state backend at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223) ... 6 more Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for 8f89af64b0cf95ff20b8dda264c66f81_8f89af64b0cf95ff20b8dda264c66f81_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 7 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 9 more Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:142) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 13 more ``` Maybe it's explainable with 「inferred and serialized as their Java counterparts」, not sure, I am a triple beginner with Java & Scala & Flink. Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Tzu-Li (Gordon) Tai
Hi Izual, thanks for contributing and improving the documentation. The PR will be picked up as part of our regular maintenance work. The communication will happen through PR conversations as soon as someone picks it up. Best, Matthias On Tue, Sep 1, 2020 at 8:44 AM izual <[hidden email]> wrote:
Matthias Pohl | Engineer Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner |
Free forum by Nabble | Edit this page |