Hi!
Lately I seem to be hitting a bug in the rocksdb timer service. This happens mostly at checkpoints but sometimes even at watermark: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: org.rocksdb.RocksDBException: Invalid column family specified in write batch at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775) at com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327) ... 7 more Caused by: org.rocksdb.RocksDBException: Invalid column family specified in write batch at org.rocksdb.RocksDB.write0(Native Method) at org.rocksdb.RocksDB.write(RocksDB.java:602) at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95) at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331) Has anyone seen this yet? Dont remember seeing this before 1.7 Gyula |
Hi,
I have never seen this before. I would assume to see this exception because the write batch is flushed and contained a write against a column family that does not exist (anymore). However, we initialize everything relevant in RocksDBCachingPriorityQueueSet as final (CF handle) and never drop any column families or exchange db instances that are used with the writebatch, except after timer service and writebatch are already closed, in dispose(). Would be nice if they had added the name of the missing CF to the exception. The last remove is not necessarily the culprit, is is just what happened to trigger the flush, but it could be the culprit because any batched op could be. If you observe it near checkpoints and watermarks, that is not surprising because those are two points where flushes are likely to happen. Do you have any custom modifications that can drop column families. Because I cannot see where a CF could get lost in the vanilla Flink code. Is there any other particular circumstance around this happening, e.g. like first flush after a restore or something like that? Best, Stefan
|
Thanks for the tip Stefan, you are probably right that this might be related to a custom change. We have a change that deletes every state that hasn't been registered in the open method and maybe it accidentally delates the timer service as well, need to check. Thanks! Gyula On Tue, Jan 15, 2019 at 10:42 AM Stefan Richter <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |