Hey,
My job is built on SQL that is injected as an input to the job. so lets take an example of Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a (side note: in order for the state not to grow indefinitely i'm transforming to a retracted stream and filtering based on a custom trigger) In order to get the output as a Json format i basically created a way to dynamically generate a class and registering it to the class loader, so when transforming to the retracted stream im doing something like: Table result = tableEnv.sqlQuery(sqlExpression); tableEnv.toRetractStream(result, Row.class, config) .filter(tuple -> tuple.f0) .map(new RowToDynamicClassMapper(sqlSelectFields)) .addSink(..) This actually works pretty good (though i do need to make sure to register the dynamic class to the class loader whenever the state is loaded) Im now looking into "schema evolution" - which basically means what happens when the query is changed (say max(c) is removed, and maybe max(d) is added). I dont know if that fits the classic "schema evolution" feature or should that be thought about differently. Would be happy to get some thoughts. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Shahar, I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more. For the example you provided, IMO it is more considered as logic change instead of schema evolution: - if you are changing max(c) to max(d) in your query. I don't think this qualifies as schema evolution. - if you are adding another column "max(d)" to your query along with your existing "max(c)" that might be considered as a backward compatible change. However, either case you will have to restart your logic, you can also consult how state schema evolution [1], and there are many other problems that can be tricky as well[2,3]. Thanks, Rong [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html On Wed, Mar 6, 2019 at 12:52 PM shkob1 <[hidden email]> wrote: Hey, |
Thanks for the response Rong. Would be happy to clarify more. So there are two possible changes that could happen:
Shahar On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <[hidden email]> wrote:
|
Hi Shahar, 1. Are you referring to that the incoming data source is published as JSON and you have a customized Pojo source function / table source that converts it? In that case it is you that maintains the schema evolution support am I correct? For Avro I think you can refer to [1]. 2. If you change the SQL, you will have to recompile and rerun your job. This means the new compilation of the SQL will yield correct logic to run against your new schema. I don't foresee this to be an issue. For the second problem: yes it is your customized serialization sink function's responsibility to convert Row into the output class objects. I am not sure if this is the piece of code that you are looking for [2] if you are using Avro, but you might be able to leverage that? If you are sticking with your own format of generated/dynamic class, you might have to create that in your custom source/sink table. Thanks, Rong On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
|
Thanks Rong,
I have made some quick test changing the SQL select (adding a select field in the middle) and reran the job from a savepoint and it worked without any errors. I want to make sure i understand how at what point the state is stored and how does it work. Let's simplify the scenario and forget my specific case of dynamically generated pojo. let's focus on generic steps of: Source->register table->SQL select and group by session->retracted stream (Row)->transformToPojo (Custom Map function) ->pushToSink And let's assume the SQL select is changed (a field is added somewhere in the middle of the select field). So: We had intermediate results that are in the old format that are loaded from state to the new Row object in the retracted stream. is that an accurate statement? at what operator/format is the state stored in this case? is it the SQL result/Row? is it the Pojo? as this scenario does not fail for me im trying to understand how/where it is handled in Flink? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Shahar, From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL. So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators. However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here? Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do. -- Rong On Fri, Mar 8, 2019 at 12:07 PM shkob1 <[hidden email]> wrote: Thanks Rong, |
Thanks Guys, I actually got an error now adding some fields into the select statement: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it. Thanks On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <[hidden email]> wrote:
|
Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
|
Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers. So far we did not explore this rabbit hole. Also, starting a different query from a savepoint can also lead to weird result semantics. I'd recommend to bootstrap the state of the new query from scatch. Best, Fabian Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
|
Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which "metrics" can be added/deleted/renamed Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c Group by a im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me. Trying something like Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map group by a results with "Non-query expression encountered in illegal context" is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it? Thanks! On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <[hidden email]> wrote:
|
My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <[hidden email]> wrote:
|
Hi, I think this would work. However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]). This includes previous versions of keys. Also note that we are not guaranteeing savepoint compatibility across Flink versions yet. If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version. Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. Best, Fabian Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
|
Hi Fabian,
It seems like it didn't work. Let me specify what i have done: i have a SQL that looks something like: Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM... GROUP BY a As you said im preventing keys in the state forever by doing idle state retention time (+ im transforming to retracted stream along with a custom trigger that sends the data to the sink). I tried adding a new item to the map ( say 'sum_e', sum(e) ), cancelled with savepoint and rerun from that savepoint and got the same error as above about state incompatibility. Why do you think would that happen? Thanks Shahar -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Debugging locally it seems like the state descriptor of "GroupAggregateState"
is creating an additional field (TypleSerializer of SumAccumulator) serializer within the RowSerializer. Im guessing this is what causing incompatibility? Is there any work around i can do? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Sorry to flood this thread, but keeping my experiments:
so far i've been using retract to a Row and then mapping to a dynamic pojo that is created (using ByteBuddy) according to the select fields in the SQL. Considering the error I'm trying now to remove thr usage in Row and use the dynamic type directly when converting query result table to a retracted stream. However, since i dont have the compile time knowledge of the select field types (the created pojo has them as "Object") i don't think i can create a Kryo serializer for them (i guess by using Row i deferred my problem from compile time to schema evolution time by using row -> dynamic object conversion). So i guess i need to find a solution in a way that i can either: - figure out how to infer the type of a SQL select field based on the source table somehow - OR figure out how to create a (Kryo?) serializer that can convert to the dynamic object in a similar way to the RowSerializer (supporting Object fields). Would love to hear more thoughts -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Fabian Hueske-2
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. ? On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <[hidden email]> wrote:
|
Hi Shahar, Sorry for the late response. The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type. The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it. The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG. Best, Fabian Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
|
That makes sense Fabian!
So I want to make sure I fully understand how this should look.
Would the same expression look like:
custom_groupby(my_group_fields, map[ 'a', sum(a)...])
?
Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions?
In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map?
Thanks again, and congrats on an awesome conference, I had learned a lot
Shahar
From: Fabian Hueske
Sent: Monday, April 8, 02:54
Subject: Re: Schema Evolution on Dynamic Schema
To: Shahar Cizer Kobrinsky
Cc: Rong Rong, user
Hi Shahar,
Sorry for the late response.
The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG.
Best,
Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions
Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
|
Hi Shahar, Thanks! The approach of the UDAGG would be very manual. You could not reuse the built-in functions. There are several ways to achieve this. One approach could be to have a map-based UDAGG for each type of aggregation that you'd like to support (SUM, COUNT, ...) Let's say we have a sumMap function, it could have a MAP(String, Double) as input parameter and produce a MAP(String, Double) as result. Internally, the function would create and maintain a sum aggregate for each unique String key of the map. The same could be done for countMap, minMap, etc. Since the accumulator of the UDAGGs would be a map, it should be state compatible and support a growing number of aggregates. I would not be easily possible (without injecting marker records) to delete aggregates. I don't think this would be very efficient, but should work. Best, Fabian Am Di., 9. Apr. 2019 um 01:35 Uhr schrieb Shahar Cizer Kobrinsky <[hidden email]>:
|
Free forum by Nabble | Edit this page |