Hi,all As we know java map support transfer value,such as : HashMap<String, List<String>> stringListHashMap = new HashMap<>(); for (int i = 0; i < 10; i++) { List<String> a = stringListHashMap.get("a"+i%2); if (a==null){ a=new ArrayList<>(); stringListHashMap.put("a"+i%2,a); } a.add("a"+i); } stringListHashMap.keySet().forEach(x-> System.out.println("keys========= "+x)); stringListHashMap.get("a0").forEach(x-> System.out.println("========x=== "+x)); Result: keys========= a1 keys========= a0 ========x=== a0 ========x=== a2 ========x=== a4 ========x=== a6 ========x=== a8 But flink on cluster, mapstate not support transfer value, such as : .process(new KeyedProcessFunction<String, Tuple2<String, Object>, String>() { ... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .cleanupInBackground() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); MapStateDescriptor noMatchDescriptor = new MapStateDescriptor<String, List<Object>>( "kuduError", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<List<Object>>() { }) ); ... } ... @Override public void processElement(Tuple2<String, Object> stringObjectTuple2, Context context, Collector<String> collector) throws Exception { ... List<Object> objectList = new ArrayList<>(); if (mapState.contains(key)) { objectList = mapState.get(key); }else{ mapState.put(key,objectList); } logger.info("key{} add before objectList.size {}",key,objectList.size()); objectList.add(stringObjectTuple2.f1); logger.info("key{} add after objectList.size {}",key,objectList.size()); objectList = mapState.get(key); logger.info("key{} mapState objectList.size {}",key,objectList.size()); ... }
If I run this code in IDEA support transfer value, this log detail: keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 1 keyorder2infos add before objectList.size 1 keyorder2infos add after objectList.size 2 keyorder2infos mapState objectList.size 2 … But if run it on yarn-cluster not support transfer value ,this log detail: keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 0 keyorder2infos add before objectList.size 0 keyorder2infos add after objectList.size 1 keyorder2infos mapState objectList.size 0 … So, I want to konw : 1.This is a bug or my code has some wrong? 2.Why mapstate on yarn-cluster not support transfer value? I have seen flink source code ,but not find. It runs on: Flink 1.9.0 Java 1.8 Hadoop 2.6.0 Best, Shengjk1 |
Hi Shengjk1, You should call "mapState.put(key,objectList);" manually after calling "objectList.add(stringObjectTuple2.f1);" to write it to the state backend. This is because objectList is just a common Java list object and it will not be synced to state backend automatic when updated. I guess the result you see is because heap statebackend is used when running in IDE and rocksdb statebackend is used when running in YARN mode. You can check the configuration if it's the case. Regards, Dian
|
Hi, Dian Thanks for your reminder. I saw HeapMaoState.java and TtlMapState.java, You are right. But with objectList get large, every time call "mapState.put(key,objectList);” is very influencing performance, even lead to checkpoint timeout. Now I am not have a better method to improve performance. Maybe I need to redesign my program. Best, Shengjk1 On 09/19/2019 20:08,[hidden email] wrote:
|
Free forum by Nabble | Edit this page |