Hi,all
As we know java map support transfer value,such as :
HashMap<String, List<String>> stringListHashMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
List<String> a = stringListHashMap.get("a"+i%2);
if (a==null){
a=new ArrayList<>();
stringListHashMap.put("a"+i%2,a);
}
a.add("a"+i);
}
stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x));
stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x));
Result:
keys========= a1
keys========= a0
========x=== a0
========x=== a2
========x=== a4
========x=== a6
========x=== a8
But flink on cluster, mapstate not support transfer value, such as :
.process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() {
...
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>(
"kuduError",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<List<Object>>() {
})
);
...
}
...
@Override
public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception {
...
List<Object> objectList = new ArrayList<>();
if (mapState.contains(key)) {
objectList = mapState.get(key);
}else{
mapState.put(key,objectList);
}
logger.info("key{} add before objectList.size {}",key,objectList.size());
objectList.add(stringObjectTuple2.f1);
logger.info("key{} add after objectList.size {}",key,objectList.size());
objectList = mapState.get(key);
logger.info("key{} mapState objectList.size {}",key,objectList.size());
...
}
keyorder2infos add before objectList.size 0
keyorder2infos add after objectList.size 1
keyorder2infos mapState objectList.size 0
keyorder2infos add before objectList.size 0
keyorder2infos add after objectList.size 1
keyorder2infos mapState objectList.size 0