@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
final int serializedSize = t.getSerializedSize();
dataOutputView.writeInt(serializedSize);
final byte[] data = new byte[serializedSize];
t.writeTo(CodedOutputStream.newInstance(data));
dataOutputView.write(data);
}
@Override
public T deserialize(DataInputView dataInputView) throws IOException {
final int serializedSize = dataInputView.readInt();
final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
final byte[] data = new byte[serializedSize];
dataInputView.read(data);
return parser.parseFrom(CodedInputStream.newInstance(data));
}
Hello,
I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
- generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
- wait for checkpoint
- restart taskmanager
- state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null
Any ideas, what could cause resetting state to null?
Thanks,Alexey
Free forum by Nabble | Edit this page |