Window operator schema evolution - savepoint deserialization fail

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Window operator schema evolution - savepoint deserialization fail

tisonet
Hi,

I run simple streaming job where I compute hourly impressions for campaigns:

.keyBy(imp => imp.campaign_id)
.window(TumblingEventTimeWindows.of(...))
.aggregate(new BudgetSpendingByImpsAggregateFunction(), new
BudgetSpendingByImpsWindowFunction())

Where aggregate function just sums impressions:

class BudgetSpendingByImpsAggregateFunction extends
AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{
    override def add(value: ImpressionEvent, accumulator: BudgetSpending):
BudgetSpending = {
        accumulator + value
    }
...
}

BudgetSpending is just simple scala case class accumulator

case class BudgetSpending(var impressions: Int = 0){
    def +(imp: ImpressionEvent): BudgetSpending =_
}


I am trying to add a new counter to BudgetSpending accumulator class:

@SerialVersionUID(-7854299638715463891L)
case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0)

But when I run the job with savepoint from previous version I get error:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from
any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
        ... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
        at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
        at
ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
        ... 7 more


How I can solve this problem? Thanks a lot.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window operator schema evolution - savepoint deserialization fail

Andrey Zagrebin
Hi,

Adding a new field to a case class breaks serialisation format in savepoint at the moment and requires state migration which is currently not supported in Flink implicitly.

Although, I would expect the failure earlier while performing the compatibility check upon restore.
According to the source code, the compatibility check in CaseClassSerializer is inherited from TupleSerializerBase
which does not check the change of field number, because I think it is not expected in a certain tuple class.

I will cc Gordon and Aljoscha, maybe they will add more details.

Best,
Andrey


> On 14 Sep 2018, at 09:48, tisonet <[hidden email]> wrote:
>
> Hi,
>
> I run simple streaming job where I compute hourly impressions for campaigns:
>
> .keyBy(imp => imp.campaign_id)
> .window(TumblingEventTimeWindows.of(...))
> .aggregate(new BudgetSpendingByImpsAggregateFunction(), new
> BudgetSpendingByImpsWindowFunction())
>
> Where aggregate function just sums impressions:
>
> class BudgetSpendingByImpsAggregateFunction extends
> AggregateFunction[ImpressionEvent, BudgetSpending, BudgetSpending]{
>    override def add(value: ImpressionEvent, accumulator: BudgetSpending):
> BudgetSpending = {
>        accumulator + value
>    }
> ...
> }
>
> BudgetSpending is just simple scala case class accumulator
>
> case class BudgetSpending(var impressions: Int = 0){
>    def +(imp: ImpressionEvent): BudgetSpending =_
> }
>
>
> I am trying to add a new counter to BudgetSpending accumulator class:
>
> @SerialVersionUID(-7854299638715463891L)
> case class BudgetSpending(var impressions: Int = 0, var spent: Double = 0)
>
> But when I run the job with savepoint from previous version I get error:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_74af6afa20f38ce575bfc2d1386aa434_(1/2) from
> any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
> ... 5 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
> at
> ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
> at
> ga.p2502.streams.flink.budget.spending.HourlyBudgetSpendingJob$$anon$18$$anon$8.createInstance(HourlyBudgetSpendingJob.scala:60)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:133)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:133)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:430)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
> ... 7 more
>
>
> How I can solve this problem? Thanks a lot.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Window operator schema evolution - savepoint deserialization fail

tisonet
Hi Andrey

thanks for answer.

It seems that is not possible to handle case class evolution in version
which I currently use (1.5.3).
Do you have any recommendation how to avoid such problem in future? Adding a
new field with default value to an existing class seems to me as a common
use case. Can I use custom class, pojo class or Avro serde?


Thanks Zdenek.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Window operator schema evolution - savepoint deserialization fail

Andrey Zagrebin
Hi Zdenek,

the schema evolution can be tricky in general.
I would suggest to plan possible schema extensions in advance,
use more custom serialisation and make sure that it supports the required types of evolution.
E.g. some custom Avro serialiser might tolerate better adding a field with a default value.
You could have a look into TypeSerializer abstract class and its API for TypeSerializer.snapshotConfiguration and TypeSerializer.ensureCompatibility. In certain cases state migration might be unavoidable.
At the moment the community is working on better support of state migration by Flink.
Currently you still might have to write e.g. your own job to convert/migrate state in savepoint.

Best,
Andrey


> On 17 Sep 2018, at 10:24, tisonet <[hidden email]> wrote:
>
> Hi Andrey
>
> thanks for answer.
>
> It seems that is not possible to handle case class evolution in version
> which I currently use (1.5.3).
> Do you have any recommendation how to avoid such problem in future? Adding a
> new field with default value to an existing class seems to me as a common
> use case. Can I use custom class, pojo class or Avro serde?
>
>
> Thanks Zdenek.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/