Hi all, I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B. The input: { "A": "352580084349898", "B": "1546559127", "C": "A" } This is the core logic of my Flink code: DataStream<GenericDataObject> genericDataObjectDataStream = inputStream .map(new MapFunction<String, GenericDataObject>() { @Override public GenericDataObject map(String s) throws Exception { JSONObject jsonObject = new JSONObject(s); GenericDataObject genericDataObject = new GenericDataObject(); genericDataObject.setA(jsonObject.getString("A")); genericDataObject.setB(jsonObject.getString("B")); genericDataObject.setC(jsonObject.getString("C")); return genericDataObject; } }); DataStream<GenericDataObject> testStream = genericDataObjectDataStream .keyBy("A", "B") .map(new MapFunction<GenericDataObject, GenericDataObject>() { @Override public GenericDataObject map(GenericDataObject genericDataObject) throws Exception { return genericDataObject; } }); testStream.print(); GenericDataObject is a POJO with three fields A, B and C . And this is the console output for different values of field B. 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'} 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'} 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here. Thanks, Harshith |
Typo: lines 1, 2 and 5 From: Harshith Kumar Bolar <[hidden email]> Hi all, I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B. The input: { "A": "352580084349898", "B": "1546559127", "C": "A" } This is the core logic of my Flink code: DataStream<GenericDataObject> genericDataObjectDataStream = inputStream .map(new MapFunction<String, GenericDataObject>() { @Override public GenericDataObject map(String s) throws Exception { JSONObject jsonObject = new JSONObject(s); GenericDataObject genericDataObject = new GenericDataObject(); genericDataObject.setA(jsonObject.getString("A")); genericDataObject.setB(jsonObject.getString("B")); genericDataObject.setC(jsonObject.getString("C")); return genericDataObject; } }); DataStream<GenericDataObject> testStream = genericDataObjectDataStream .keyBy("A", "B") .map(new MapFunction<GenericDataObject, GenericDataObject>() { @Override public GenericDataObject map(GenericDataObject genericDataObject) throws Exception { return genericDataObject; } }); testStream.print(); GenericDataObject is a POJO with three fields A, B and C . And this is the console output for different values of field B. 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559127', C='A'} 4> GenericDataObject{A='352580084349898', B='1546559234', C='A'} 3> GenericDataObject{A='352580084349898', B='1546559254', C='A'} 5> GenericDataObject{A='352580084349898', B='1546559224', C='A'} Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here. Thanks, Harshith |
Hi Harshith You can replace the GenericDataObject with Tuple3 and keyBy("A", "B") with keyBy(1, 2) then have a try. And you can see the doc[1] for reference also. Kumar Bolar, Harshith <[hidden email]> 于2019年1月29日周二 下午12:49写道:
|
Free forum by Nabble | Edit this page |