Hi Experts,
In my self-defined UDAF, I found if I return a null value in UDAF, would cause checkpoint fails, the following is the error log: I think it is quite a common case to return a null value in UDAF, because sometimes no value could be determined, why Flink has such a limitation for UDAF return value? Thanks a lot! org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator groupBy: (DRAFT_ORDER_ID), select: (DRAFT_ORDER_ID, latest_value_long_test($f1, LAST_UPDATE_TIME) AS tt) -> select: (CAST(DRAFT_ORDER_ID) AS EXPR$0, _UTF-16LE'order' AS EXPR$1, tt, _UTF-16LE'111' AS EXPR$3) -> to: Tuple2 -> Sink: Unnamed (2/4). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 common frames omitted Caused by: org.apache.flink.types.NullFieldException: Field 0 is null, but expected to hold a value. at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:116) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:47) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.lambda$getKeyGroupWriter$0(CopyOnWriteStateTableSnapshot.java:148) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResult.writeStateInKeyGroup(KeyGroupPartitioner.java:261) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:757) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 common frames omitted Caused by: java.lang.NullPointerException: null at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:69) at org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) ... 17 common frames omitted Best Henry
|
Hi Henry,
could you share a little reproducible
example? From what I see you are using a custom aggregate function
with a case class inside, right? Flink's case class serializer
does not support null because the usage of `null` is also not very
Scala like.
Use a `Row` type for supporting nulls
properly.
Hope this helps.
Timo
Am 30.01.19 um 12:35 schrieb 徐涛:
Hi Experts,
|
Free forum by Nabble | Edit this page |