java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:227)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:270)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 moregroupBy: (risk_id, dev_only_id), select: (risk_id, dev_only_id, UpdateColumn(org_name, evaluation_time_millis) AS TMP_586, UpdateColumn(rule_weight_sum, evaluation_time_millis) AS TMP_581, UpdateColumn(area_code, evaluation_time_millis) AS TMP_583, SUM(rule_risk) AS TMP_580, UpdateColumn(ip, evaluation_time_millis) AS TMP_587, UpdateColumn(evaluation_time_millis, evaluation_time_millis) AS TMP_588, UpdateColumn(area_name, evaluation_time_millis) AS TMP_584, UpdateColumn(evaluation_time, evaluation_time_millis) AS TMP_582, UpdateColumn(org_code, evaluation_time_millis) AS TMP_585) -> select: (risk_id, _UTF-16LE'''' AS risk_name, /(TMP_580, CAST(TMP_581)) AS risk_value, TMP_582 AS evaluation_time, TMP_583 AS area_code, TMP_584 AS area_name, TMP_585 AS org_code, TMP_586 AS org_name, dev_only_id, TMP_587 AS ip, CAST(TMP_581) AS rule_weight_sum, CAST(TMP_588) AS evaluation_time_millis) -> to: Tuple2 -> Filter -> Map -> from: (risk_id, risk_name, risk_value, evaluation_time, area_code, area_name, org_code, org_name, dev_only_id, ip, risk_weight_sum, evaluation_time_millis) (3/6)
Free forum by Nabble | Edit this page |