Hi,
I run simple streaming job where I compute hourly impressions for campaigns: .keyBy(imp => imp.campaign_id) .window(TumblingEventTimeWindows.of(...)) .aggregate(new BudgetSpendingByImpsAggregateFunction(), new BudgetSpendingByImpsWindowFunction()) Where aggregate function just sums impressions: class BudgetSpendingByImpsAggregateFunction extends AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{ override def add(value: ImpressionEvent, accumulator: BudgetSpending): BudgetSpending = { accumulator + value } ... } BudgetSpending is just simple scala case class accumulator case class BudgetSpending(var impressions: Int = 0){ def +(imp: ImpressionEvent): BudgetSpending =_ } I am trying to add a new counter to BudgetSpending accumulator class: @SerialVersionUID(-7854299638715463891L) case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0) But when I run the job with savepoint from previous version I get error: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) ... 5 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 at ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) at ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more How I can solve this problem? Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Adding a new field to a case class breaks serialisation format in savepoint at the moment and requires state migration which is currently not supported in Flink implicitly. Although, I would expect the failure earlier while performing the compatibility check upon restore. According to the source code, the compatibility check in CaseClassSerializer is inherited from TupleSerializerBase which does not check the change of field number, because I think it is not expected in a certain tuple class. I will cc Gordon and Aljoscha, maybe they will add more details. Best, Andrey > On 14 Sep 2018, at 09:48, tisonet <[hidden email]> wrote: > > Hi, > > I run simple streaming job where I compute hourly impressions for campaigns: > > .keyBy(imp => imp.campaign_id) > .window(TumblingEventTimeWindows.of(...)) > .aggregate(new BudgetSpendingByImpsAggregateFunction(), new > BudgetSpendingByImpsWindowFunction()) > > Where aggregate function just sums impressions: > > class BudgetSpendingByImpsAggregateFunction extends > AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{ > override def add(value: ImpressionEvent, accumulator: BudgetSpending): > BudgetSpending = { > accumulator + value > } > ... > } > > BudgetSpending is just simple scala case class accumulator > > case class BudgetSpending(var impressions: Int = 0){ > def +(imp: ImpressionEvent): BudgetSpending =_ > } > > > I am trying to add a new counter to BudgetSpending accumulator class: > > @SerialVersionUID(-7854299638715463891L) > case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0) > > But when I run the job with savepoint from previous version I get error: > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) > ... 5 more > Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 > at > ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) > at > ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > > > How I can solve this problem? Thanks a lot. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Andrey
thanks for answer. It seems that is not possible to handle case class evolution in version which I currently use (1.5.3). Do you have any recommendation how to avoid such problem in future? Adding a new field with default value to an existing class seems to me as a common use case. Can I use custom class, pojo class or Avro serde? Thanks Zdenek. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Zdenek,
the schema evolution can be tricky in general. I would suggest to plan possible schema extensions in advance, use more custom serialisation and make sure that it supports the required types of evolution. E.g. some custom Avro serialiser might tolerate better adding a field with a default value. You could have a look into TypeSerializer abstract class and its API for TypeSerializer.snapshotConfiguration and TypeSerializer.ensureCompatibility. In certain cases state migration might be unavoidable. At the moment the community is working on better support of state migration by Flink. Currently you still might have to write e.g. your own job to convert/migrate state in savepoint. Best, Andrey > On 17 Sep 2018, at 10:24, tisonet <[hidden email]> wrote: > > Hi Andrey > > thanks for answer. > > It seems that is not possible to handle case class evolution in version > which I currently use (1.5.3). > Do you have any recommendation how to avoid such problem in future? Adding a > new field with default value to an existing class seems to me as a common > use case. Can I use custom class, pojo class or Avro serde? > > > Thanks Zdenek. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |