FLINK TABLE API UDAF QUESTION, For heap backends, the new state serializer must not be incompatible

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK TABLE API UDAF QUESTION, For heap backends, the new state serializer must not be incompatible

orlando qi
Hello everyone:

    I defined a UDAF function when I am using the  FLINK TABLE API to achieve the aggregation operation. There is no problem with the task running from beginning in cluster. But it throws an exception when it is restart task from checkpoint,How can I resolve it ?

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more
groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)
--

orlando
Reply | Threaded
Open this post in threaded view
|

Fwd: FLINK TABLE API UDAF QUESTION, For heap backends, the new state serializer must not be incompatible

orlando qi


---------- Forwarded message ---------
发件人: orlando qi <[hidden email]>
Date: 2019年8月23日周五 上午10:44
Subject: FLINK TABLE API UDAF QUESTION, For heap backends, the new state serializer must not be incompatible
To: <[hidden email]>


Hello everyone:

    I defined a UDAF function when I am using the  FLINK TABLE API to achieve the aggregation operation. There is no problem with the task running from beginning in cluster. But it throws an exception when it is restart task from checkpoint,How can I resolve it ?

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more
groupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)
--

orlando