I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.
input.setParallelism(processParallelism) .assignTimestampsAndWatermarks(new UETimeAssigner) .keyBy(_.key) .window(TumblingEventTimeWindows.of(Time.minutes(20))) .trigger(new MyTrigger) .evictor(new MyEvictor) .process(new MyFunction).setParallelism(aggregateParallelism) .addSink(kafkaSink).setParallelism(sinkParallelism) .name("kafka-record-sink") And the exception stack is here, could anyone help with this? Thanks! java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) ... 3 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32) at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114) at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) ... 5 more Best regards Sili Liu |
Thanks for reporting this. Looks like the window namespace was replaced by VoidNamespace in state entry. I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it. Regards,
Roman On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Best regards
Sili Liu |
Thanks for the clarification. Can you also share the code of other parts, particularly MyFunction? Regards,
Roman On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough. I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times. class JoinFunction extends class JoinTrigger extends Trigger[RawLog, TimeWindow] { class JoinEvictor extends Evictor[RawLog, TimeWindow] { Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Best regards
Sili Liu |
Hi, Thanks for the details. However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source. Do you encounter this problem deterministically, is it always 1st checkpoint? What checkpointing interval do you use? Regards,
Roman On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help. The checkpoint configuration is checkpoint.intervalMS=300000 checkpoint.timeoutMS=300000 The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet. Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Best regards
Sili Liu |
I still wasn't able to reproduce the issue. Can you also clarify: - Are you starting the job from a savepoint or externalized checkpoint? - If yes, was the job graph changed? - What StreamTimeCharacteristic is set, if any? - What exact version of Flink do you use? Regards,
Roman On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Thanks for your help 1. I started the job from scratch, not a savepoint or externalized checkpoint 2. No job graph change 3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 4. My Flink version is 1.9.1 Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
Best regards
Sili Liu |
Hi First, Could you please try this problem still there if use flink 1.10 or 1.11? Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem. Best, Congxian Si-li Liu <[hidden email]> 于2020年7月3日周五 下午6:44写道:
Sorry I can't reproduce it with reduce/aggregate/fold/apply and due to some limitations in my working environment, I can't use flink 1.10 or 1.11. Congxian Qiu <[hidden email]> 于2020年7月5日周日 下午6:21写道:
Best regards
Sili Liu |
Hi Si-li I just want to double-check is the original problem has been solved? As I found that the created issue FLINK-18464 has been closed with reason "can not reproduce". Am I missing something here? Best, Congxian Si-li Liu <[hidden email]> 于2020年7月10日周五 下午6:06写道:
Someone told me that maybe this issue is Mesos specific. I'm kind of a newbie in Flink, and I digged into the code but can not get a conclusion. Here I just wanna have a better JoinWindow that emits the result and delete it from the window state immediately when joined successfully, is there any other way? Thanks! Congxian Qiu <[hidden email]> 于2020年7月11日周六 下午3:20写道:
Best regards
Sili Liu |
Free forum by Nabble | Edit this page |