State Issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

State Issue

Navneeth Krishnan
Hi,

I'm experiencing a wired issue where any data put into map state when retrieved with the same key is returning as null and hence it puts the same value again and again. I used rocksdb state backend but tried with Memory state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I couldn't access the previous value. But when I iterate over the MapState keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.
NestedMapsStateTable.java
S get(K key, int keyGroupIndex, N namespace) {

checkKeyNamespacePreconditions(key, namespace);

Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);

if (namespaceMap == null) {
return null;
}


Map<K, S> keyedMap = namespaceMap.get(namespace);

if (keyedMap == null) {
return null;
}

return keyedMap.get(key);
}

HeapMapState.java
@Override
public void put(UK userKey, UV userValue) {

HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);
}


userMap.put(userKey, userValue);
}

My Code:
open()
MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("test-state",
TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<String>() {}));
testState = getRuntimeContext().getMapState(testStateDescriptor);

flatMap:
if(testState.contains(user)){
// DO Something
}
else {
testState.put(user, userInfo);
}

streamEnv.setStateBackend(new MemoryStateBackend());
streamEnv.setParallelism(1);

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: State Issue

Navneeth Krishnan
Sorry my bad, figured out it was a change done at our end which created different keys. Thanks.

On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan <[hidden email]> wrote:
Hi,

I'm experiencing a wired issue where any data put into map state when retrieved with the same key is returning as null and hence it puts the same value again and again. I used rocksdb state backend but tried with Memory state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I couldn't access the previous value. But when I iterate over the MapState keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.
NestedMapsStateTable.java
S get(K key, int keyGroupIndex, N namespace) {

checkKeyNamespacePreconditions(key, namespace);

Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);

if (namespaceMap == null) {
return null;
}


Map<K, S> keyedMap = namespaceMap.get(namespace);

if (keyedMap == null) {
return null;
}

return keyedMap.get(key);
}

HeapMapState.java
@Override
public void put(UK userKey, UV userValue) {

HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);
}


userMap.put(userKey, userValue);
}

My Code:
open()
MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("test-state",
TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<String>() {}));
testState = getRuntimeContext().getMapState(testStateDescriptor);

flatMap:
if(testState.contains(user)){
// DO Something
}
else {
testState.put(user, userInfo);
}

streamEnv.setStateBackend(new MemoryStateBackend());
streamEnv.setParallelism(1);

Thanks