Schema Evolution on Dynamic Schema

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

Schema Evolution on Dynamic Schema

shkob1
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Rong Rong
Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more.

For the example you provided, IMO it is more considered as logic change instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this qualifies as schema evolution. 
- if you are adding another column "max(d)" to your query along with your existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also consult how state schema evolution [1], and there are many other problems that can be tricky as well[2,3]. 

Thanks,
Rong



On Wed, Mar 6, 2019 at 12:52 PM shkob1 <[hidden email]> wrote:
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Thanks for the response Rong. Would be happy to clarify more. 
So there are two possible changes that could happen:
  1. There could be a change in the incoming source schema. Since there's a deserialization phase here (JSON -> Pojo) i expect a couple of options. Backward compatible changes to the JSON should not have an impact (as the Pojo is the same), however we might want to change the Pojo which i believe is a state evolving action. I do want to migrate the Pojo to Avro - will that suffice for Schema evolution feature to work? 
  2. The other possible change is the SQL select fields change, as mention someone could add/delete/change-order another field to the SQL Select. I do see this as an issue per the way i transform the Row object to the dynamically generated class. This is done today through indices of the class fields and the ones of the Row object. This seems like an issue for when for example a select field is added in the middle and now there's an older Row which fields order is not matching the (new) generated Class fields order. I'm thinking of how to solve that one - i imagine this is not something the schema evolution feature can solve (am i right?). im thinking on whether there is a way i can transform the Row object to my generated class by maybe the Row's column names corresponding to the generated class field names, though i don't see Row object has any notion of column names.
Would love to hear your thoughts. If you want me to paste some code here i can do so.

Shahar

On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <[hidden email]> wrote:
Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more.

For the example you provided, IMO it is more considered as logic change instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this qualifies as schema evolution. 
- if you are adding another column "max(d)" to your query along with your existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also consult how state schema evolution [1], and there are many other problems that can be tricky as well[2,3]. 

Thanks,
Rong



On Wed, Mar 6, 2019 at 12:52 PM shkob1 <[hidden email]> wrote:
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Rong Rong
Hi Shahar,

1. Are you referring to that the incoming data source is published as JSON and you have a customized Pojo source function / table source that converts it? In that case it is you that maintains the schema evolution support am I correct? For Avro I think you can refer to [1].
2. If you change the SQL, you will have to recompile and rerun your job. This means the new compilation of the SQL will yield correct logic to run against your new schema. I don't foresee this to be an issue. For the second problem: yes it is your customized serialization sink function's responsibility to convert Row into the output class objects. I am not sure if this is the piece of code that you are looking for [2] if you are using Avro, but you might be able to leverage that?

If you are sticking with your own format of generated/dynamic class, you might have to create that in your custom source/sink table. 

Thanks,
Rong


On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks for the response Rong. Would be happy to clarify more. 
So there are two possible changes that could happen:
  1. There could be a change in the incoming source schema. Since there's a deserialization phase here (JSON -> Pojo) i expect a couple of options. Backward compatible changes to the JSON should not have an impact (as the Pojo is the same), however we might want to change the Pojo which i believe is a state evolving action. I do want to migrate the Pojo to Avro - will that suffice for Schema evolution feature to work? 
  2. The other possible change is the SQL select fields change, as mention someone could add/delete/change-order another field to the SQL Select. I do see this as an issue per the way i transform the Row object to the dynamically generated class. This is done today through indices of the class fields and the ones of the Row object. This seems like an issue for when for example a select field is added in the middle and now there's an older Row which fields order is not matching the (new) generated Class fields order. I'm thinking of how to solve that one - i imagine this is not something the schema evolution feature can solve (am i right?). im thinking on whether there is a way i can transform the Row object to my generated class by maybe the Row's column names corresponding to the generated class field names, though i don't see Row object has any notion of column names.
Would love to hear your thoughts. If you want me to paste some code here i can do so.

Shahar

On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <[hidden email]> wrote:
Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more.

For the example you provided, IMO it is more considered as logic change instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this qualifies as schema evolution. 
- if you are adding another column "max(d)" to your query along with your existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also consult how state schema evolution [1], and there are many other problems that can be tricky as well[2,3]. 

Thanks,
Rong



On Wed, Mar 6, 2019 at 12:52 PM shkob1 <[hidden email]> wrote:
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Rong Rong
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Fabian Hueske-2
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Fabian Hueske-2
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian


Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Hi Fabian,

It seems like it didn't work.
Let me specify what i have done:

i have a SQL that looks something like:
Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM...
GROUP BY a

As you said im preventing keys in the state forever by doing idle state
retention time (+ im transforming to retracted stream along with a custom
trigger that sends the data to the sink).

I tried adding a new item to the map ( say 'sum_e', sum(e) ), cancelled with
savepoint and rerun from that savepoint and got the same error as above
about state incompatibility.

Why do you think would that happen?

Thanks
Shahar






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Debugging locally it seems like the state descriptor of "GroupAggregateState"
is creating an additional field (TypleSerializer of SumAccumulator)
serializer within the RowSerializer. Im guessing this is what causing
incompatibility? Is there any work around i can do?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
Sorry to flood this thread, but keeping my experiments:

so far i've been using retract to a Row and then mapping to a dynamic pojo
that is created (using ByteBuddy) according to the select fields in the SQL.
Considering the error I'm trying now to remove thr usage in Row and use the
dynamic type directly when converting query result table to a retracted
stream.

However, since i dont have the compile time knowledge of the select field
types (the created pojo has them as "Object") i don't think i can create a
Kryo serializer for them (i guess by using Row i deferred my problem from
compile time to schema evolution time by using row -> dynamic object
conversion).

So i guess i need to find a solution in a way that i can either:
- figure out how to infer the type of a SQL select field based on the source
table somehow
- OR figure out how to create a (Kryo?) serializer that can convert to the
dynamic object in a similar way to the RowSerializer (supporting Object
fields).

Would love to hear more thoughts







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
In reply to this post by Fabian Hueske-2
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <[hidden email]> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian


Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Fabian Hueske-2
Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG.

Best,
Fabian


Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <[hidden email]> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian


Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong


On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

shkob1
That makes sense Fabian!
So I want to make sure I fully understand how this should look.
Would the same expression look like:

custom_groupby(my_group_fields, map[ 'a', sum(a)...])
?
Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions?
In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map?

Thanks again, and congrats on an awesome conference, I had learned a lot
Shahar

From: Fabian Hueske
Sent: Monday, April 8, 02:54
Subject: Re: Schema Evolution on Dynamic Schema
To: Shahar Cizer Kobrinsky
Cc: Rong Rong, user


Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG.

Best,
Fabian

Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <[hidden email]> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong

On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--


Reply | Threaded
Open this post in threaded view
|

Re: Schema Evolution on Dynamic Schema

Fabian Hueske-2
Hi Shahar,

Thanks!

The approach of the UDAGG would be very manual. You could not reuse the built-in functions.
There are several ways to achieve this. One approach could be to have a map-based UDAGG for each type of aggregation that you'd like to support (SUM, COUNT, ...)
Let's say we have a sumMap function, it could have a MAP(String, Double) as input parameter and produce a MAP(String, Double) as result. Internally, the function would create and maintain a sum aggregate for each unique String key of the map.
The same could be done for countMap, minMap, etc.
Since the accumulator of the UDAGGs would be a map, it should be state compatible and support a growing number of aggregates. I would not be easily possible (without injecting marker records) to delete aggregates.

I don't think this would be very efficient, but should work.

Best, Fabian

Am Di., 9. Apr. 2019 um 01:35 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
That makes sense Fabian!
So I want to make sure I fully understand how this should look.
Would the same expression look like:

custom_groupby(my_group_fields, map[ 'a', sum(a)...])
?
Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions?
In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map?

Thanks again, and congrats on an awesome conference, I had learned a lot
Shahar

From: Fabian Hueske
Sent: Monday, April 8, 02:54
Subject: Re: Schema Evolution on Dynamic Schema
To: Shahar Cizer Kobrinsky
Cc: Rong Rong, user


Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG.

Best,
Fabian

Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <[hidden email]> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
My bad. it actually did work with 
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. 
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. 
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong

On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--