KeyBy is not creating different keyed streams for different keys

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyBy is not creating different keyed streams for different keys

HarshithBolar

Hi all,

 

I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.

 

The input:

 

{

    "A": "352580084349898",

    "B": "1546559127",

    "C": "A"

}

 

This is the core logic of my Flink code:

 

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream

            .map(new MapFunction<String, GenericDataObject>() {

                @Override

                public GenericDataObject map(String s) throws Exception {

                    JSONObject jsonObject = new JSONObject(s);

                    GenericDataObject genericDataObject = new GenericDataObject();

                    genericDataObject.setA(jsonObject.getString("A"));

                    genericDataObject.setB(jsonObject.getString("B"));

                    genericDataObject.setC(jsonObject.getString("C"));

                    return genericDataObject;

                }

            });

DataStream<GenericDataObject> testStream = genericDataObjectDataStream

            .keyBy("A", "B")

            .map(new MapFunction<GenericDataObject, GenericDataObject>() {

                @Override

                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {

                    return genericDataObject;

                }

            });

testStream.print();

 

GenericDataObject is a POJO with three fields A, B and C .

 

And this is the console output for different values of field B.

 

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}

4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}

3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

 

Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here.

 

Thanks,

Harshith

 

Reply | Threaded
Open this post in threaded view
|

Re: KeyBy is not creating different keyed streams for different keys

HarshithBolar

Typo: lines 1, 2 and 5

 

From: Harshith Kumar Bolar <[hidden email]>
Date: Tuesday, 29 January 2019 at 10:16 AM
To: "[hidden email]" <[hidden email]>
Subject: KeyBy is not creating different keyed streams for different keys

 

Hi all,

 

I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.

 

The input:

 

{

    "A": "352580084349898",

    "B": "1546559127",

    "C": "A"

}

 

This is the core logic of my Flink code:

 

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream

            .map(new MapFunction<String, GenericDataObject>() {

                @Override

                public GenericDataObject map(String s) throws Exception {

                    JSONObject jsonObject = new JSONObject(s);

                    GenericDataObject genericDataObject = new GenericDataObject();

                    genericDataObject.setA(jsonObject.getString("A"));

                    genericDataObject.setB(jsonObject.getString("B"));

                    genericDataObject.setC(jsonObject.getString("C"));

                    return genericDataObject;

                }

            });

DataStream<GenericDataObject> testStream = genericDataObjectDataStream

            .keyBy("A", "B")

            .map(new MapFunction<GenericDataObject, GenericDataObject>() {

                @Override

                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {

                    return genericDataObject;

                }

            });

testStream.print();

 

GenericDataObject is a POJO with three fields A, B and C .

 

And this is the console output for different values of field B.

 

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}

4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}

3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

 

Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here.

 

Thanks,

Harshith

 

Reply | Threaded
Open this post in threaded view
|

Re: KeyBy is not creating different keyed streams for different keys

Congxian Qiu
Hi Harshith

You can replace the GenericDataObject with Tuple3 and keyBy("A", "B") with keyBy(1, 2) then have a try.
And you can see the doc[1] for reference also.


Kumar Bolar, Harshith <[hidden email]> 于2019年1月29日周二 下午12:49写道:

Typo: lines 1, 2 and 5

 

From: Harshith Kumar Bolar <[hidden email]>
Date: Tuesday, 29 January 2019 at 10:16 AM
To: "[hidden email]" <[hidden email]>
Subject: KeyBy is not creating different keyed streams for different keys

 

Hi all,

 

I'm reading a simple JSON string as input and keying the stream based on two fields A and B. But KeyBy is generating the same keyed stream for different values of B but for a particular combination of A and B.

 

The input:

 

{

    "A": "352580084349898",

    "B": "1546559127",

    "C": "A"

}

 

This is the core logic of my Flink code:

 

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream

            .map(new MapFunction<String, GenericDataObject>() {

                @Override

                public GenericDataObject map(String s) throws Exception {

                    JSONObject jsonObject = new JSONObject(s);

                    GenericDataObject genericDataObject = new GenericDataObject();

                    genericDataObject.setA(jsonObject.getString("A"));

                    genericDataObject.setB(jsonObject.getString("B"));

                    genericDataObject.setC(jsonObject.getString("C"));

                    return genericDataObject;

                }

            });

DataStream<GenericDataObject> testStream = genericDataObjectDataStream

            .keyBy("A", "B")

            .map(new MapFunction<GenericDataObject, GenericDataObject>() {

                @Override

                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {

                    return genericDataObject;

                }

            });

testStream.print();

 

GenericDataObject is a POJO with three fields A, B and C .

 

And this is the console output for different values of field B.

 

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}

4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}

3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

 

Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in the same keyed stream (5). I must be doing something fundamentally wrong here.

 

Thanks,

Harshith