Hi,all This is my code .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() { ... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .cleanupInBackground() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>( "kuduError", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<List<Object>>() { }) ); ... } ... @Override public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception { ... List<Object> objectList = new ArrayList<>(); if (mapState.contains(key)) { objectList = mapState.get(key); }else{ mapState.put(key,objectList); } logger.info("key{} add before objectList.size{}",key,objectList.size()); objectList.add(stringObjectTuple2.f1); logger.info("key{} add after objectList.size{}",key,objectList.size()); objectList = mapState.get(key); logger.info("key{} mapState objectList.size{}",key,objectList.size()); ... } If I run this in idea the log detail: Best, Shengjk1 |
Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated. Regards, Dian
|
Free forum by Nabble | Edit this page |