Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

r_khachatryan
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

r_khachatryan
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

r_khachatryan
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

r_khachatryan
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Congxian Qiu
Hi

First, Could you please try this problem still there if use flink 1.10 or 1.11?

It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月3日周五 下午6:44写道:
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu <[hidden email]> 于2020年7月5日周日 下午6:21写道:
Hi

First, Could you please try this problem still there if use flink 1.10 or 1.11?

It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月3日周五 下午6:44写道:
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Congxian Qiu
Hi Si-li

Thanks for the notice. 
I just want to double-check is the original problem has been solved?  As I found that the created issue FLINK-18464 has been closed with reason "can not reproduce". Am I missing something here?

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月10日周五 下午6:06写道:
Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu <[hidden email]> 于2020年7月5日周日 下午6:21写道:
Hi

First, Could you please try this problem still there if use flink 1.10 or 1.11?

It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月3日周五 下午6:44写道:
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint failed because of TimeWindow cannot be cast to VoidNamespace

Si-li Liu
Someone told me that maybe this issue is Mesos specific. I'm kind of a newbie in Flink, and I digged into the code but can not get a conclusion. Here I just wanna have a better JoinWindow that emits the result and delete it from the window state immediately when joined successfully, is there any other way? Thanks!

Congxian Qiu <[hidden email]> 于2020年7月11日周六 下午3:20写道:
Hi Si-li

Thanks for the notice. 
I just want to double-check is the original problem has been solved?  As I found that the created issue FLINK-18464 has been closed with reason "can not reproduce". Am I missing something here?

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月10日周五 下午6:06写道:
Sorry

I can't reproduce it with reduce/aggregate/fold/apply and due to some limitations in my working environment, I can't use flink 1.10 or 1.11.

Congxian Qiu <[hidden email]> 于2020年7月5日周日 下午6:21写道:
Hi

First, Could you please try this problem still there if use flink 1.10 or 1.11?

It seems strange, from the error message, here is an error when trying to convert a non-Window state(VoidNameSpace) to a Window State (serializer is the serializer of Window state, but the state is non-Window state).
Could you please try to replace the MyFuction with a reduce/aggregate/fold/apply() function to see what happens? -- this wants to narrow down the problem.

Best,
Congxian


Si-li Liu <[hidden email]> 于2020年7月3日周五 下午6:44写道:
Thanks for your help

1. I started the job from scratch, not a savepoint or externalized checkpoint
2. No job graph change
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
4. My Flink version is 1.9.1

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 下午4:49写道:
I still wasn't able to reproduce the issue. 

Can you also clarify:
- Are you starting the job from a savepoint or externalized checkpoint? 
- If yes, was the job graph changed?
- What StreamTimeCharacteristic is set, if any?
- What exact version of Flink do you use?

Regards,
Roman


On Fri, Jul 3, 2020 at 6:38 AM Si-li Liu <[hidden email]> wrote:
Hi, Thanks for your help.

The checkpoint configuration is

checkpoint.intervalMS=300000
checkpoint.timeoutMS=300000

The error callstack is from JM's log, which happened in every cp. Currently I don't have a success cp yet.

Khachatryan Roman <[hidden email]> 于2020年7月3日周五 上午3:50写道:
Hi,

Thanks for the details.
However, I was not able to reproduce the issue. I used parallelism levels 4, file system backend and tried different timings for checkpointing, windowing and source.
Do you encounter this problem deterministically, is it always 1st checkpoint? 
What checkpointing interval do you use?

Regards,
Roman


On Thu, Jul 2, 2020 at 1:57 PM Si-li Liu <[hidden email]> wrote:
Hi, this is our production code so I have to modify it a little bit, such as variable name and function name. I think 3 classes I provide here is enough.

I try to join two streams, but I don't want to use the default join function, because I want to send the joined log immediately and remove it from window state immediately. And my window gap time is very long( 20 minutes), so it maybe evaluate it multiple times.
class JoinFunction extends
ProcessWindowFunction[RawLog, OutputLog, String, TimeWindow]{

var ueState: ValueState[RawLog] = _
@transient var gZipThriftSerializer: GZipThriftSerializer[MyType] = _
val invalidCounter = new LongCounter()
val processCounter = new LongCounter()
val sendToKafkaCounter = new LongCounter()

override def open(parameters: Configuration): Unit = {
ueState = getRuntimeContext.getState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
gZipThriftSerializer = new GZipThriftSerializer[MyType]()
getRuntimeContext.addAccumulator("processCounter", this.processCounter)
getRuntimeContext.addAccumulator("invalidCounter", this.invalidCounter)
getRuntimeContext.addAccumulator("sendToKafkaCounter", this.sendToKafkaCounter)
}

override def process(key: String,
ctx: Context,
logs: Iterable[RawLog],
out: Collector[OutputLog]): Unit = {
if (ueState.value() != null) {
processCounter.add(1L)
val bid = ueState.value()
val bidLog = gZipThriftSerializer.decompressAndDeserialize(bid.payload, classOf[MyType])
logs.foreach( log => {
if (log.eventType == SHOW) {
val showLog = gZipThriftSerializer.decompressAndDeserialize(log.payload, classOf[MyType])
sendToKafkaCounter.add(1L)
out.collect(new OutputLog(ThriftUtils.serialize(showLog), Utils.getOutputTopic(showLog)))
}
})
} else {
invalidCounter.add(1L)
}
}
}
class JoinTrigger extends Trigger[RawLog, TimeWindow] {

override def onElement(log: RawLog,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))

if (!firstSeen.value()) {
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
val eventType = log.eventType
if (eventType == BID) {
ueState.update(log)
TriggerResult.CONTINUE
} else {
if (ueState.value() == null) {
TriggerResult.CONTINUE
} else {
TriggerResult.FIRE
}
}
}

override def onEventTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.PURGE
}
TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def clear(window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
val ueState: ValueState[RawLog] = ctx.getPartitionedState(
new ValueStateDescriptor[RawLog](bidState, classOf[RawLog])
)
ueState.clear()

val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean](initState, classOf[Boolean]))
firstSeen.clear()

ctx.deleteEventTimeTimer(window.getEnd)
}
}
class JoinEvictor extends Evictor[RawLog, TimeWindow] {

override def evictBefore(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {}

override def evictAfter(elements: JIterable[TimestampedValue[RawLog]],
size: Int,
window: TimeWindow,
evictorContext: Evictor.EvictorContext): Unit = {
val iter = elements.iterator()
while (iter.hasNext) {
iter.next()
iter.remove()
}
}
}

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午7:18写道:
Thanks for the clarification.

Can you also share the code of other parts, particularly MyFunction?

Regards,
Roman


On Thu, Jul 2, 2020 at 12:49 PM Si-li Liu <[hidden email]> wrote:
Rocksdb backend has the same problem

Khachatryan Roman <[hidden email]> 于2020年7月2日周四 下午6:11写道:
Thanks for reporting this.

Looks like the window namespace was replaced by VoidNamespace in state entry.
I've created https://issues.apache.org/jira/browse/FLINK-18464 to further investigate it.

Regards,
Roman


On Thu, Jul 2, 2020 at 10:52 AM Si-li Liu <[hidden email]> wrote:
I'm using flink 1.9 on Mesos and I try to use my own trigger and evictor. The state is stored to memory.

input.setParallelism(processParallelism)
        .assignTimestampsAndWatermarks(new UETimeAssigner)
        .keyBy(_.key)
        .window(TumblingEventTimeWindows.of(Time.minutes(20)))
        .trigger(new MyTrigger)
        .evictor(new MyEvictor)
        .process(new MyFunction).setParallelism(aggregateParallelism)
        .addSink(kafkaSink).setParallelism(sinkParallelism)
        .name("kafka-record-sink")

And the exception stack is here, could anyone help with this? Thanks!

java.lang.Exception: Could not materialize checkpoint 1 for operator Window(TumblingEventTimeWindows(1200000), JoinTrigger, JoinEvictor, ScalaProcessWindowFunctionWrapper) -> Sink: kafka-record-sink (2/5).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
    ... 3 more
Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.windowing.windows.TimeWindow cannot be cast to org.apache.flink.runtime.state.VoidNamespace
    at org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:114)
    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
    ... 5 more

--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu


--
Best regards

Sili Liu