Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Vijay Balakrishnan
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 
Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Timothy Victor
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 
Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Vijay Balakrishnan
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 
Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Chesnay Schepler
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 


Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Timothy Victor
Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your data.   This, I think, would solve your problem.  But beyond that I think the code will be more understandable.  It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-)  Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph.   Keeping a data model in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it without much context to your problem.  So please take with a large grain of salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <[hidden email]> wrote:
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 


Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

abhishek sharma
I agree with Timothy, POJO would be a much better approach.

However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Abhishek

On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <[hidden email]> wrote:
Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your data.   This, I think, would solve your problem.  But beyond that I think the code will be more understandable.  It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-)  Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph.   Keeping a data model in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it without much context to your problem.  So please take with a large grain of salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <[hidden email]> wrote:
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 


Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Vijay Balakrishnan
Thx for all your replies. Solved the problem by skirting the issue. I pre-populated the incoming Monitoring Object on intake with the dynamic runtime fields keyName and keyValue and that way, I could use the static call as used in all the other if conditions:
  monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "keyName","keyValue");

The reason, I want to use Tuple was because I was passing this KeyedStream<Monitoring, Tuple> to a common method that could handle the Tuple accordingly.

I tried using  [ keyBy(KeySelector, TypeInformation) ] but the compiler complained that I need to use Monitoring, Tuple6 in that particular case.

Vijay

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <[hidden email]> wrote:
I agree with Timothy, POJO would be a much better approach.

However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Abhishek

On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <[hidden email]> wrote:
Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your data.   This, I think, would solve your problem.  But beyond that I think the code will be more understandable.  It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-)  Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph.   Keeping a data model in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it without much context to your problem.  So please take with a large grain of salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <[hidden email]> wrote:
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 


Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Vijay Balakrishnan
In reply to this post by abhishek sharma
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2 etc.
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.

DataStream<Map<String, Object>> kinesisStream = ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains about Tuple type for monitoringTupleKeyedStream
.....

public static class MapTupleKeySelector implements KeySelector<Map<String, Object>, Tuple> {
        private final Set<String> groupBySet;

        public MapTupleKeySelector(Set<String> groupBySet) {
            this.groupBySet = groupBySet;
        }

        @Override
        public Tuple getKey(Map<String, Object> inputMap) throws Exception {
            int groupBySetSize = groupBySet.size();
            Tuple tuple = Tuple.newInstance(groupBySetSize);
            //Tuple1 tuple = new Tuple1();
            int count = 0;
            for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
            }
            return tuple;
        }
    }

Abhishek had replied back in the Thread as follows: (posting in that thread as well creating a new thread):
However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by implementing KinesisDeserializationSchema<Map<String, Object>> to convert to a DataStream<Map<String, Object>> kinesisStream. 

TIA,

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <[hidden email]> wrote:
I agree with Timothy, POJO would be a much better approach.

However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Abhishek

On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <[hidden email]> wrote:
Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your data.   This, I think, would solve your problem.  But beyond that I think the code will be more understandable.  It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-)  Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph.   Keeping a data model in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it without much context to your problem.  So please take with a large grain of salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <[hidden email]> wrote:
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA 


Reply | Threaded
Open this post in threaded view
|

Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple

Vijay Balakrishnan
 I solved the problem by following another person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
private Tuple tuple;

Then, I used it like this:
KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));

The MapTupleKeySelector is defined below:
public static class MapTupleKeySelector implements KeySelector<Map<String, Object>, MonitoringTuple> {
        private final Set<String> groupBySet;

        public MapTupleKeySelector(Set<String> groupBySet) {
            this.groupBySet = groupBySet;
        }

        @Override
        public MonitoringTuple getKey(Map<String, Object> inputMap) {
            int groupBySetSize = groupBySet.size();
            Tuple tuple = Tuple.newInstance(groupBySetSize);
            int count = 0;
            for (String groupBy : groupBySet) {
                    count = setTupleField(inputMap, tuple, count, groupBy);
            }
            return new MonitoringTuple(tuple);
        }
    }

    public static int setTupleField(Map<String, Object> inputMap, Tuple tuple, int count, String groupBy) {
        Object groupByValueObj = inputMap.get(groupBy);
        String tupleValue = Utils.convertToString(groupByValueObj);
        tuple.setField(tupleValue, count++);
        return count;
    }
}
 
 
TIA,

On Wed, May 1, 2019 at 1:39 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2 etc.
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.

DataStream<Map<String, Object>> kinesisStream = ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains about Tuple type for monitoringTupleKeyedStream
.....

public static class MapTupleKeySelector implements KeySelector<Map<String, Object>, Tuple> {
        private final Set<String> groupBySet;

        public MapTupleKeySelector(Set<String> groupBySet) {
            this.groupBySet = groupBySet;
        }

        @Override
        public Tuple getKey(Map<String, Object> inputMap) throws Exception {
            int groupBySetSize = groupBySet.size();
            Tuple tuple = Tuple.newInstance(groupBySetSize);
            //Tuple1 tuple = new Tuple1();
            int count = 0;
            for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
            }
            return tuple;
        }
    }

Abhishek had replied back in the Thread as follows: (posting in that thread as well creating a new thread):
However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by implementing KinesisDeserializationSchema<Map<String, Object>> to convert to a DataStream<Map<String, Object>> kinesisStream. 

TIA,

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <[hidden email]> wrote:
I agree with Timothy, POJO would be a much better approach.

However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream.

Abhishek

On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <[hidden email]> wrote:
Could this just be solved by creating a POJO model class for your problem?

That is, instead of using Tuple6 - create a class that encapsulates your data.   This, I think, would solve your problem.  But beyond that I think the code will be more understandable.  It's hard to have a Tuple6 of all Strings, and remember what each one means -- even if I wrote the code :-)  Furthermore, if and when you need to add more elements to your data model, you will need to refactor your entire Flink graph.   Keeping a data model in POJO protects against those things.

The latter is just unsolicited code review feedback.   And I know I gave it without much context to your problem.  So please take with a large grain of salt, and if it doesn't apply just ignore it.

Tim


On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <[hidden email]> wrote:
> I tried using  [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??  
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {......return new Tuple6<>(..}    })
I tried using  
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){}); 
kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through 

TIA,
Vijay 

On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <[hidden email]> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.   The only way to workaround this is to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}


This example below needs monitoringTupleKeyedStream  to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>> 
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info); 

TIA