MapState not support transfer value

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

MapState not support transfer value

shengjk1
Hi,all

This is my code 

.process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
        ...
      @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size{}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size{}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size{}",key,objectList.size());
                                ...
}


If I run this in idea  the  log detail:


Best,
Shengjk1


Reply | Threaded
Open this post in threaded view
|

Re: MapState not support transfer value

Dian Fu
Hi Shengjk1,

You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated.

Regards,
Dian

在 2019年9月19日,下午6:59,shengjk1 <[hidden email]> 写道:

Hi,all

This is my code 

.process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
        ...
       @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size{}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size{}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size{}",key,objectList.size());
                                ...
}


If I run this in idea  the  log detail:


Best,
Shengjk1