I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there. TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key. I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream. Here’s the graph: StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties)); SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor()) .flatMap(new TimesliceMapper()); flattenedDataStream .keyBy("accountId", "agentId", "wideMetricId") .timeWindow(Time.seconds(60)) .reduce(AggregatableTimeslice::aggregateWith) .print(); This fails on keyBy() with the message:
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface: public interface AggregatableTimeslice { |
Hi Ron, not all classes can be used to `keyBy` a stream with. For your case in particular, it looks like you have to implement Comparable so that Flink can correctly key your stream based on AggregatableTimesliceImpl. Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html Hope I helped. On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <[hidden email]> wrote:
BR, Stefano Baghino |
Thanks Stefano -
That helped, but just led to different pain. I think I need to reconsider how I treat these things. Alas, the subject of a different thread.
|
Hey Ron, for accessing keys of a class by their field name (like you did: .keyBy("accountId", "agentId", "wideMetricId")), the class needs to be recognized as a POJO by Flink. From the documentation [1] a class is recognized as a POJO when:
In your case, I believe the class you are trying to use as a POJO is AggregatableTimeslice, and that's an interface only (hence Flink is treating it as a GenericType, as you can see from the exception). What you are trying to achieve should work when you make the interface a class. Another option, and that's what I would recommend you to do in your situation is using a KeySelector function. Its basically a call-back that returns the key fields from your type. Since you are using Java 8 anyways, its not going to be a lot of boilerplate to implement the KeySelector. I would also recommend you to register the implementations of the AggregatableTimeslice class with Kryo for better performance: env.registerType(AggregatableTimesliceImpl.class); This will make the serialization with Kryo much faster.Regards, On Sun, Mar 13, 2016 at 4:04 AM, Ron Crocker <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |