state reset(lost) on TM recovery

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

state reset(lost) on TM recovery

Alexey Trenikhun
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey

Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Chesnay Schepler
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey


Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Alexey Trenikhun
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class instead of protobuf and it started to work, but why it is not working with protobuf and custom type information is unclear, checked serialize/deserialize - returns equal object, further until TM restarts it works. Is any special requirements for TypeSerializer and TypeInformation for key types ?

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler <[hidden email]>
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: state reset(lost) on TM recovery
 
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey


Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Chesnay Schepler
Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class instead of protobuf and it started to work, but why it is not working with protobuf and custom type information is unclear, checked serialize/deserialize - returns equal object, further until TM restarts it works. Is any special requirements for TypeSerializer and TypeInformation for key types ?

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler [hidden email]
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey



Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Alexey Trenikhun
That is it ! - Protobuf compiler generates hashCodes functions which are not stable cross JVM restarts ([1]), this explains observed behavior. It is clear that stable hashCode is mandatory for KeyedProcessFunctions, but is it also requirement for MapState keys? Looks like rocksdb  backend first serialize key, so it is not effected by weirdness of protobuf hashCode, but what about filesystem backend?



From: Chesnay Schepler <[hidden email]>
Sent: Tuesday, January 12, 2021 2:20 AM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: state reset(lost) on TM recovery
 
Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class instead of protobuf and it started to work, but why it is not working with protobuf and custom type information is unclear, checked serialize/deserialize - returns equal object, further until TM restarts it works. Is any special requirements for TypeSerializer and TypeInformation for key types ?

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler [hidden email]
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey



Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Chesnay Schepler
The FsStateBackend makes heavy use of hashcodes, so it must be stable.

On 1/13/2021 7:13 PM, Alexey Trenikhun wrote:
That is it ! - Protobuf compiler generates hashCodes functions which are not stable cross JVM restarts ([1]), this explains observed behavior. It is clear that stable hashCode is mandatory for KeyedProcessFunctions, but is it also requirement for MapState keys? Looks like rocksdb  backend first serialize key, so it is not effected by weirdness of protobuf hashCode, but what about filesystem backend?



From: Chesnay Schepler [hidden email]
Sent: Tuesday, January 12, 2021 2:20 AM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class instead of protobuf and it started to work, but why it is not working with protobuf and custom type information is unclear, checked serialize/deserialize - returns equal object, further until TM restarts it works. Is any special requirements for TypeSerializer and TypeInformation for key types ?

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler [hidden email]
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey




Reply | Threaded
Open this post in threaded view
|

Re: state reset(lost) on TM recovery

Alexey Trenikhun
Ok, thanks.


From: Chesnay Schepler <[hidden email]>
Sent: Wednesday, January 13, 2021 11:46:15 AM
To: Alexey Trenikhun <[hidden email]>; Flink User Mail List <[hidden email]>
Subject: Re: state reset(lost) on TM recovery
 
The FsStateBackend makes heavy use of hashcodes, so it must be stable.

On 1/13/2021 7:13 PM, Alexey Trenikhun wrote:
That is it ! - Protobuf compiler generates hashCodes functions which are not stable cross JVM restarts ([1]), this explains observed behavior. It is clear that stable hashCode is mandatory for KeyedProcessFunctions, but is it also requirement for MapState keys? Looks like rocksdb  backend first serialize key, so it is not effected by weirdness of protobuf hashCode, but what about filesystem backend?



From: Chesnay Schepler [hidden email]
Sent: Tuesday, January 12, 2021 2:20 AM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Are the hashes of these object equal as well?

On 1/12/2021 3:59 AM, Alexey Trenikhun wrote:
Hello,

Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I specify custom TypeInformation in keyBy(), today I've changed code to use Tuple2 derived class instead of protobuf and it started to work, but why it is not working with protobuf and custom type information is unclear, checked serialize/deserialize - returns equal object, further until TM restarts it works. Is any special requirements for TypeSerializer and TypeInformation for key types ?

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
  final int serializedSize = t.getSerializedSize();
  dataOutputView.writeInt(serializedSize);
  final byte[] data = new byte[serializedSize];
  t.writeTo(CodedOutputStream.newInstance(data));
  dataOutputView.write(data);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
  final int serializedSize = dataInputView.readInt();
  final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
  final byte[] data = new byte[serializedSize];
  dataInputView.read(data);
  return parser.parseFrom(CodedInputStream.newInstance(data));
}



From: Chesnay Schepler [hidden email]
Sent: Monday, January 11, 2021 4:36 PM
To: Alexey Trenikhun [hidden email]; Flink User Mail List [hidden email]
Subject: Re: state reset(lost) on TM recovery
 
Just do double-check, are you aware that ValueState within a Keyed*Function is scoped to the key of the input element(s)? I.e., any stored value is only accessible if an element with the same key is processed?

On 1/10/2021 7:18 PM, Alexey Trenikhun wrote:
Hello,

I'm using Flink 1.11.3, state backend is rocksdn. I have streaming job which reads from Kafka, transforms data and output into Kafka, one of processing nodes is KeyedCoProcessFunction with ValueState:
  1. generated some input data, I see in log that state.update() is called and subsequent state.value() return not null
  2. wait for checkpoint
  3. restart taskmanager
  4. state.value() returns null
I've tried to change backend from rocksdb to filesystem - same result, after taskmanager restart state.value() returns null

Any ideas, what could cause resetting state to null?

Thanks,
Alexey